diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-17 09:55:09 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-17 09:55:09 -0700 |
| commit | 7466cb93fbb9f4082dc253a328222dac8bbe58e4 (patch) | |
| tree | 2b60020b7ab15867bfabf135bf8217aabe553c6d /zenserver/upstream/upstreamapply.cpp | |
| parent | Introduced basic validation of the clang-format version (diff) | |
| download | zen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.tar.xz zen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.zip | |
Update horde compute to use Jupiter for storage (#60)
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 295 |
1 files changed, 187 insertions, 108 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 63c334265..918697224 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -23,6 +23,9 @@ # include <zenstore/cas.h> # include <zenstore/cidstore.h> +# include <auth/authmgr.h> +# include <upstream/upstreamcache.h> + # include "cache/structuredcachestore.h" # include "diag/logging.h" @@ -48,17 +51,76 @@ namespace detail { class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint { public: - HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<zen::CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore) + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) + , m_AuthMgr(Mgr) , m_CasStore(CasStore) , m_CidStore(CidStore) { - m_DisplayName = fmt::format("Horde - '{}'", Options.ServiceUrl); - m_Client = new CloudCacheClient(Options, std::move(TokenProvider)); + m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (ComputeAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, + .ClientId = ComputeAuthConfig.OAuthClientId, + .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); + } + else if (ComputeAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); + } + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (StorageAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, + .ClientId = StorageAuthConfig.OAuthClientId, + .ClientSecret = StorageAuthConfig.OAuthClientSecret}); + } + else if (StorageAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); + } } virtual ~HordeUpstreamApplyEndpoint() = default; @@ -109,10 +171,11 @@ namespace detail { m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; } - CloudCacheSession Session(m_Client); + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); { - CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs); + CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -126,7 +189,7 @@ namespace detail { } { - CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects); + CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -136,17 +199,33 @@ namespace detail { .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } + + Result = StorageSession.PutRef("requests"sv, + UpstreamData.TaskId, + UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), + ZenContentType::kCbObject); + + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to add task ref"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } UpstreamData.Objects.clear(); } CbObjectWriter Writer; + Writer.AddString("c"sv, m_ChannelId); Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); Writer.BeginArray("t"sv); Writer.AddObjectAttachment(UpstreamData.TaskId); Writer.EndArray(); IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); - CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData); + CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -214,8 +293,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put blobs"}; + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; } } @@ -265,8 +344,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put objects"}; + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; } } @@ -330,9 +409,10 @@ namespace detail { try { - CloudCacheSession Session(m_Client); + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); - CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); + CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; if (!UpdatesResult.Success) @@ -349,9 +429,6 @@ namespace detail { CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); - // zen::StringBuilder<4096> ObjStr; - // zen::CompactBinaryToJson(TaskStatus, ObjStr); - for (auto& It : TaskStatus["u"sv]) { CbObjectView Status = It.AsObjectView(); @@ -366,7 +443,7 @@ namespace detail { continue; } - const IoHash TaskId = Status["h"sv].AsObjectAttachment(); + const IoHash TaskId = Status["h"sv].AsHash(); IoHash WorkerId; IoHash ActionId; @@ -383,7 +460,7 @@ namespace detail { m_PendingTasks.erase(TaskIt); } - GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session); + GetUpstreamApplyResult Result = ProcessTaskStatus(Status, StorageSession); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -411,9 +488,11 @@ namespace detail { CasStore& m_CasStore; CidStore& m_CidStore; + AuthMgr& m_AuthMgr; spdlog::logger& m_Log; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + RefPtr<CloudCacheClient> m_StorageClient; UpstreamApplyEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; std::string m_ChannelId; @@ -453,9 +532,9 @@ namespace detail { return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}}; } - const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); + const IoHash TaskId = TaskStatus["h"sv].AsHash(); const DateTime Time = TaskStatus["t"sv].AsDateTime(); - const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment(); + const IoHash ResultHash = TaskStatus["r"sv].AsHash(); const std::string_view AgentId = TaskStatus["a"sv].AsString(); const std::string_view LeaseId = TaskStatus["l"sv].AsString(); @@ -463,105 +542,96 @@ namespace detail { double ElapsedSeconds{}; // Get Result object and all Object Attachments + Binary Attachment IDs - CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; - if (!ObjectTreeResult.Success) + if (!ObjectRefResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } - std::map<IoHash, IoBuffer> TreeObjectData; - std::map<IoHash, IoBuffer> TreeBinaryData; + std::vector<IoHash> ObjectsToIterate; + std::map<IoHash, IoBuffer> ObjectData; + std::map<IoHash, IoBuffer> BinaryData; - MemoryView ResponseView = ObjectTreeResult.Response; - while (ResponseView.GetSize() > 0) - { - CbFieldView Field = CbFieldView(ResponseView.GetData()); - ResponseView += Field.GetSize(); + ObjectData[ResultHash] = ObjectRefResult.Response; + CbObject Object = LoadCompactBinaryObject(ObjectData[ResultHash]); + Object.IterateAttachments([&](CbFieldView Field) { if (Field.IsObjectAttachment()) { - const IoHash Hash = Field.AsObjectAttachment(); - Field = CbFieldView(ResponseView.GetData()); - ResponseView += Field.GetSize(); - if (!Field.IsObject()) // No data + const IoHash AttachmentHash = Field.AsObjectAttachment(); + if (!ObjectData.contains(AttachmentHash)) { - TreeObjectData[Hash] = {}; - continue; + ObjectsToIterate.push_back(AttachmentHash); } - MemoryView FieldView = Field.AsObjectView().GetView(); - - TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize()); } else if (Field.IsBinaryAttachment()) { - const IoHash Hash = Field.AsBinaryAttachment(); - TreeBinaryData[Hash] = {}; - } - else // Unknown type - { + const IoHash AttachmentHash = Field.AsBinaryAttachment(); + BinaryData[AttachmentHash] = {}; } - } + }); - for (auto& It : TreeObjectData) + while (!ObjectsToIterate.empty()) { - if (It.second.GetSize() == 0) + const IoHash Hash = ObjectsToIterate.back(); + ObjectsToIterate.pop_back(); + + CloudCacheResult ObjectResult = Session.GetObject(Hash); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + if (!ObjectResult.Success) { - CloudCacheResult ObjectResult = Session.GetObject(It.first); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (!ObjectTreeResult.Success) + return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + ObjectData[Hash] = std::move(ObjectResult.Response); + + CbObject IterateObject = LoadCompactBinaryObject(ObjectData[Hash]); + IterateObject.IterateAttachments([&](CbFieldView Field) { + if (Field.IsObjectAttachment()) { - return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + const IoHash AttachmentHash = Field.AsObjectAttachment(); + if (!ObjectData.contains(AttachmentHash)) + { + ObjectsToIterate.push_back(AttachmentHash); + } } - - if (!ObjectResult.Success) + else if (Field.IsBinaryAttachment()) { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + const IoHash AttachmentHash = Field.AsBinaryAttachment(); + BinaryData[AttachmentHash] = {}; } - It.second = std::move(ObjectResult.Response); - } + }); } - for (auto& It : TreeBinaryData) + // Batch load all binary data + for (auto& It : BinaryData) { - if (It.second.GetSize() == 0) + CloudCacheResult BlobResult = Session.GetBlob(It.first); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + if (!BlobResult.Success) { - CloudCacheResult BlobResult = Session.GetBlob(It.first); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (!BlobResult.Success) - { - return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - if (!BlobResult.Success) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - It.second = std::move(BlobResult.Response); + return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; } + It.second = std::move(BlobResult.Response); } - CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]); + CbObject ResultObject = LoadCompactBinaryObject(ObjectData[ResultHash]); int32_t ExitCode = ResultObject["e"sv].AsInt32(); IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); - std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize()); - std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize()); + std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()); + std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()); if (ExitCode != 0) { @@ -572,7 +642,7 @@ namespace detail { .StdErr = std::move(StdErr)}; } - CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]); + CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]); // Get build.output IoHash BuildOutputId; @@ -583,7 +653,7 @@ namespace detail { if (FileObject["n"sv].AsString() == "Build.output"sv) { BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); - BuildOutput = TreeBinaryData[BuildOutputId]; + BuildOutput = BinaryData[BuildOutputId]; break; } } @@ -604,7 +674,7 @@ namespace detail { const CbObjectView DirectoryObject = It.AsObjectView(); if (DirectoryObject["n"sv].AsString() == "Outputs"sv) { - OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; break; } } @@ -636,7 +706,7 @@ namespace detail { // Hash is the compressed data hash, and how it is stored in Horde IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); - if (!TreeBinaryData.contains(CompressedId)) + if (!BinaryData.contains(CompressedId)) { Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, @@ -659,7 +729,7 @@ namespace detail { } const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); - if (!TreeBinaryData.contains(CompressedId)) + if (!BinaryData.contains(CompressedId)) { Log().warn("Missing output {} compressed {} uncompressed", CompressedId.ToHexString(), @@ -668,7 +738,7 @@ namespace detail { return; } - CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId])); + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); if (!AttachmentBuffer) { @@ -826,11 +896,11 @@ namespace detail { int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); - // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) std::string Condition = fmt::format("Platform == '{}'", HostPlatform); if (HostPlatform == "Win64") { - Condition += " && Pool == 'Win-RemoteExec'"; + // TODO + // Condition += " && Pool == 'Win-RemoteExec'"; } std::map<std::string_view, int64_t> Resources; @@ -1176,10 +1246,10 @@ struct UpstreamApplyStats ////////////////////////////////////////////////////////////////////////// -class DefaultUpstreamApply final : public UpstreamApply +class UpstreamApplyImpl final : public UpstreamApply { public: - DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) + UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) : m_Log(logging::Get("upstream-apply")) , m_Options(Options) , m_CasStore(CasStore) @@ -1188,7 +1258,7 @@ public: { } - virtual ~DefaultUpstreamApply() { Shutdown(); } + virtual ~UpstreamApplyImpl() { Shutdown(); } virtual bool Initialize() override { @@ -1213,12 +1283,12 @@ public: for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { - m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); + m_UpstreamThreads.emplace_back(&UpstreamApplyImpl::ProcessUpstreamQueue, this); } - m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this); + m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); - m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this); + m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); } return m_RunState.IsRunning; @@ -1558,18 +1628,27 @@ private: ////////////////////////////////////////////////////////////////////////// std::unique_ptr<UpstreamApply> -MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) +UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) { - return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore); + return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore); } std::unique_ptr<UpstreamApplyEndpoint> -MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore) +UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) { - return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, std::move(TokenProvider), CasStore, CidStore); + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + CasStore, + CidStore, + Mgr); } } // namespace zen |