// Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamapply.h" #if ZEN_WITH_COMPUTE_SERVICES # include "jupiter.h" # include "zen.h" # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include "cache/structuredcachestore.h" # include "diag/logging.h" # include # include # include # include # include # include # include # include namespace zen { using namespace std::literals; static const IoBuffer EmptyBuffer; static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); namespace detail { class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint { public: HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) , m_AuthMgr(Mgr) , m_CasStore(CasStore) , m_CidStore(CidStore) { m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); { std::unique_ptr TokenProvider; if (ComputeAuthConfig.OAuthUrl.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, .ClientId = ComputeAuthConfig.OAuthClientId, .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); } else if (ComputeAuthConfig.OpenIdProvider.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; }); } else { CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); } m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); } { std::unique_ptr TokenProvider; if (StorageAuthConfig.OAuthUrl.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, .ClientId = StorageAuthConfig.OAuthClientId, .ClientSecret = StorageAuthConfig.OAuthClientSecret}); } else if (StorageAuthConfig.OpenIdProvider.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; }); } else { CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); } m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); } } virtual ~HordeUpstreamApplyEndpoint() = default; virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } virtual bool IsHealthy() const override { return m_HealthOk.load(); } virtual UpstreamEndpointHealth CheckHealth() override { try { CloudCacheSession Session(m_Client); CloudCacheResult Result = Session.Authenticate(); m_HealthOk = Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; } catch (std::exception& Err) { return {.Reason = Err.what(), .Ok = false}; } } virtual std::string_view DisplayName() const override { return m_DisplayName; } virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override { int64_t Bytes{}; double ElapsedSeconds{}; try { UpstreamData UpstreamData; if (!ProcessApplyKey(ApplyRecord, UpstreamData)) { return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; } { std::scoped_lock Lock(m_TaskMutex); if (m_PendingTasks.contains(UpstreamData.TaskId)) { // Pending task is already queued, return success return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; } CloudCacheSession ComputeSession(m_Client); CloudCacheSession StorageSession(m_StorageClient); { CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } UpstreamData.Blobs.clear(); } { CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } Result = StorageSession.PutRef("requests"sv, UpstreamData.TaskId, UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), ZenContentType::kCbObject); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to add task ref"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } UpstreamData.Objects.clear(); } CbObjectWriter Writer; Writer.AddString("c"sv, m_ChannelId); Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); Writer.BeginArray("t"sv); Writer.AddObjectAttachment(UpstreamData.TaskId); Writer.EndArray(); IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { { std::scoped_lock Lock(m_TaskMutex); m_PendingTasks.erase(UpstreamData.TaskId); } return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } catch (std::exception& Err) { m_HealthOk = false; return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } } [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map& Blobs) { if (Blobs.size() == 0) { return {.Success = true}; } int64_t Bytes{}; double ElapsedSeconds{}; // Batch check for missing blobs std::set Keys; for (const auto& It : Blobs) { Keys.insert(It.first); } CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); ElapsedSeconds += ExistsResult.ElapsedSeconds; if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; } // TODO: Batch upload missing blobs for (const auto& It : Blobs) { if (ExistsResult.Have.contains(It.first)) { continue; } CloudCacheResult Result = Session.PutBlob(It.first, It.second); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; } } return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map& Objects) { if (Objects.size() == 0) { return {.Success = true}; } int64_t Bytes{}; double ElapsedSeconds{}; // Batch check for missing objects std::set Keys; for (const auto& It : Objects) { Keys.insert(It.first); } CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); ElapsedSeconds += ExistsResult.ElapsedSeconds; if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; } // TODO: Batch upload missing objects for (const auto& It : Objects) { if (ExistsResult.Have.contains(It.first)) { continue; } CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer()); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; } } return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } enum class ComputeTaskState : int32_t { Queued = 0, Executing = 1, Complete = 2, }; enum class ComputeTaskOutcome : int32_t { Success = 0, Failed = 1, Cancelled = 2, NoResult = 3, Exipred = 4, BlobNotFound = 5, Exception = 6, }; std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) { switch (Outcome) { case ComputeTaskOutcome::Success: return "Success"sv; case ComputeTaskOutcome::Failed: return "Failed"sv; case ComputeTaskOutcome::Cancelled: return "Cancelled"sv; case ComputeTaskOutcome::NoResult: return "NoResult"sv; case ComputeTaskOutcome::Exipred: return "Exipred"sv; case ComputeTaskOutcome::BlobNotFound: return "BlobNotFound"sv; case ComputeTaskOutcome::Exception: return "Exception"sv; }; return "Unknown"sv; } virtual GetUpstreamApplyUpdatesResult GetUpdates() override { int64_t Bytes{}; double ElapsedSeconds{}; UpstreamApplyCompleted CompletedTasks; { std::scoped_lock Lock(m_TaskMutex); if (m_PendingTasks.empty()) { // Nothing to do. return {.Success = true}; } } try { CloudCacheSession ComputeSession(m_Client); CloudCacheSession StorageSession(m_StorageClient); CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; if (!UpdatesResult.Success) { return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } if (!UpdatesResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); for (auto& It : TaskStatus["u"sv]) { CbObjectView Status = It.AsObjectView(); const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); const std::string_view AgentId = TaskStatus["a"sv].AsString(); const std::string_view LeaseId = TaskStatus["l"sv].AsString(); // Only care about completed tasks if (State != ComputeTaskState::Complete) { continue; } const IoHash TaskId = Status["h"sv].AsHash(); IoHash WorkerId; IoHash ActionId; { std::scoped_lock Lock(m_TaskMutex); auto TaskIt = m_PendingTasks.find(TaskId); if (TaskIt == m_PendingTasks.end()) { continue; } WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); ActionId = TaskIt->second.Action.GetHash(); m_PendingTasks.erase(TaskIt); } GetUpstreamApplyResult Result = ProcessTaskStatus(Status, StorageSession); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; CompletedTasks[WorkerId][ActionId] = std::move(Result); } return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; } catch (std::exception& Err) { m_HealthOk = false; return { .Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), }; } } virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } private: spdlog::logger& Log() { return m_Log; } CasStore& m_CasStore; CidStore& m_CidStore; AuthMgr& m_AuthMgr; spdlog::logger& m_Log; std::string m_DisplayName; RefPtr m_Client; RefPtr m_StorageClient; UpstreamApplyEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; std::string m_ChannelId; std::mutex m_TaskMutex; std::unordered_map m_PendingTasks; struct UpstreamData { std::map Blobs; std::map Objects; IoHash TaskId; IoHash RequirementsId; }; struct UpstreamDirectory { std::filesystem::path Path; std::map Directories; std::set Files; }; [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session) { try { const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32(); if (Outcome != ComputeTaskOutcome::Success) { const std::string_view Detail = TaskStatus["d"sv].AsString(); if (!Detail.empty()) { return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}: {}", ComputeTaskOutcomeToString(Outcome), std::string(Detail))}}; } return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}}; } const IoHash TaskId = TaskStatus["h"sv].AsHash(); const DateTime Time = TaskStatus["t"sv].AsDateTime(); const IoHash ResultHash = TaskStatus["r"sv].AsHash(); const std::string_view AgentId = TaskStatus["a"sv].AsString(); const std::string_view LeaseId = TaskStatus["l"sv].AsString(); int64_t Bytes{}; double ElapsedSeconds{}; // Get Result object and all Object Attachments + Binary Attachment IDs CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); Bytes += ObjectRefResult.Bytes; ElapsedSeconds += ObjectRefResult.ElapsedSeconds; if (!ObjectRefResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } std::vector ObjectsToIterate; std::map ObjectData; std::map BinaryData; ObjectData[ResultHash] = ObjectRefResult.Response; CbObject Object = LoadCompactBinaryObject(ObjectData[ResultHash]); Object.IterateAttachments([&](CbFieldView Field) { if (Field.IsObjectAttachment()) { const IoHash AttachmentHash = Field.AsObjectAttachment(); if (!ObjectData.contains(AttachmentHash)) { ObjectsToIterate.push_back(AttachmentHash); } } else if (Field.IsBinaryAttachment()) { const IoHash AttachmentHash = Field.AsBinaryAttachment(); BinaryData[AttachmentHash] = {}; } }); while (!ObjectsToIterate.empty()) { const IoHash Hash = ObjectsToIterate.back(); ObjectsToIterate.pop_back(); CloudCacheResult ObjectResult = Session.GetObject(Hash); Bytes += ObjectRefResult.Bytes; ElapsedSeconds += ObjectRefResult.ElapsedSeconds; if (!ObjectResult.Success) { return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } ObjectData[Hash] = std::move(ObjectResult.Response); CbObject IterateObject = LoadCompactBinaryObject(ObjectData[Hash]); IterateObject.IterateAttachments([&](CbFieldView Field) { if (Field.IsObjectAttachment()) { const IoHash AttachmentHash = Field.AsObjectAttachment(); if (!ObjectData.contains(AttachmentHash)) { ObjectsToIterate.push_back(AttachmentHash); } } else if (Field.IsBinaryAttachment()) { const IoHash AttachmentHash = Field.AsBinaryAttachment(); BinaryData[AttachmentHash] = {}; } }); } // Batch load all binary data for (auto& It : BinaryData) { CloudCacheResult BlobResult = Session.GetBlob(It.first); Bytes += ObjectRefResult.Bytes; ElapsedSeconds += ObjectRefResult.ElapsedSeconds; if (!BlobResult.Success) { return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } It.second = std::move(BlobResult.Response); } CbObject ResultObject = LoadCompactBinaryObject(ObjectData[ResultHash]); int32_t ExitCode = ResultObject["e"sv].AsInt32(); IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()); std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()); if (ExitCode != 0) { return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .StdOut = std::move(StdOut), .StdErr = std::move(StdErr)}; } CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]); // Get build.output IoHash BuildOutputId; IoBuffer BuildOutput; for (auto& It : OutputObject["f"sv]) { const CbObjectView FileObject = It.AsObjectView(); if (FileObject["n"sv].AsString() == "Build.output"sv) { BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); BuildOutput = BinaryData[BuildOutputId]; break; } } if (BuildOutput.GetSize() == 0) { return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .StdOut = std::move(StdOut), .StdErr = std::move(StdErr)}; } // Get Output directory node IoBuffer OutputDirectoryTree; for (auto& It : OutputObject["d"sv]) { const CbObjectView DirectoryObject = It.AsObjectView(); if (DirectoryObject["n"sv].AsString() == "Outputs"sv) { OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; break; } } if (OutputDirectoryTree.GetSize() == 0) { return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .StdOut = std::move(StdOut), .StdErr = std::move(StdErr)}; } // load build.output as CbObject // Move Outputs from Horde to CbPackage std::unordered_map CidToCompressedId; CbPackage OutputPackage; CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); int64_t TotalAttachmentBytes = 0; int64_t TotalRawAttachmentBytes = 0; for (auto& It : OutputDirectoryTreeObject["f"sv]) { CbObjectView FileObject = It.AsObjectView(); // Name is the uncompressed hash IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); // Hash is the compressed data hash, and how it is stored in Horde IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); if (!BinaryData.contains(CompressedId)) { Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } CidToCompressedId[DecompressedId] = CompressedId; } // Iterate attachments, verify all chunks exist, and add to CbPackage bool AnyErrors = false; CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); BuildOutputObject.IterateAttachments([&](CbFieldView Field) { const IoHash DecompressedId = Field.AsHash(); if (!CidToCompressedId.contains(DecompressedId)) { Log().warn("Attachment not found {}", DecompressedId.ToHexString()); AnyErrors = true; return; } const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); if (!BinaryData.contains(CompressedId)) { Log().warn("Missing output {} compressed {} uncompressed", CompressedId.ToHexString(), DecompressedId.ToHexString()); AnyErrors = true; return; } CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); if (!AttachmentBuffer) { Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", CompressedId.ToHexString(), DecompressedId.ToHexString()); AnyErrors = true; return; } TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); CbAttachment Attachment(AttachmentBuffer); OutputPackage.AddAttachment(Attachment); }); if (AnyErrors) { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .StdOut = std::move(StdOut), .StdErr = std::move(StdErr)}; } OutputPackage.SetObject(BuildOutputObject); return {.OutputPackage = std::move(OutputPackage), .TotalAttachmentBytes = TotalAttachmentBytes, .TotalRawAttachmentBytes = TotalRawAttachmentBytes, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .StdOut = std::move(StdOut), .StdErr = std::move(StdErr), .Success = true}; } catch (std::exception& Err) { return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) { std::string ExecutablePath; std::map Environment; std::set InputFiles; std::map InputFileHashes; ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); if (ExecutablePath.empty()) { Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", ApplyRecord.WorkerDescriptor.GetHash()); return false; } for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) { CbObjectView FileEntry = It.AsObjectView(); if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) { return false; } } for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) { CbObjectView FileEntry = It.AsObjectView(); if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) { return false; } } for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) { std::string_view Directory = It.AsString(); std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory); InputFiles.insert(DummyFile); Data.Blobs[EmptyBufferId] = EmptyBuffer; InputFileHashes[DummyFile] = EmptyBufferId; } for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) { std::string_view Env = It.AsString(); auto Index = Env.find('='); if (Index < 0) { Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); return false; } Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); } { static const std::filesystem::path BuildActionPath = "Build.action"sv; static const std::filesystem::path InputPath = "Inputs"sv; const IoHash ActionId = ApplyRecord.Action.GetHash(); InputFiles.insert(BuildActionPath); InputFileHashes[BuildActionPath] = ActionId; Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), ApplyRecord.Action.GetBuffer().GetSize()); bool AnyErrors = false; ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { const IoHash Cid = Field.AsHash(); const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); if (!DataBuffer) { Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); AnyErrors = true; return; } if (InputFiles.contains(FilePath)) { return; } const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); InputFiles.insert(FilePath); InputFileHashes[FilePath] = CompressedId; Data.Blobs[CompressedId] = std::move(DataBuffer); }); if (AnyErrors) { return false; } } const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); const IoHash SandboxHash = Sandbox.GetHash(); Data.Objects[SandboxHash] = std::move(Sandbox); { std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); if (HostPlatform.empty()) { Log().warn("process apply upstream FAILED, 'host' platform not provided"); return false; } int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32(); int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); std::string Condition = fmt::format("Platform == '{}'", HostPlatform); if (HostPlatform == "Win64") { // TODO // Condition += " && Pool == 'Win-RemoteExec'"; } std::map Resources; if (LogicalCores > 0) { Resources["LogicalCores"sv] = LogicalCores; } if (Memory > 0) { Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL); } CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); const IoHash RequirementsId = Requirements.GetHash(); Data.Objects[RequirementsId] = std::move(Requirements); Data.RequirementsId = RequirementsId; } CbObject Task = BuildTask(ExecutablePath, {"-Build=build.action"}, Environment, {}, SandboxHash, Data.RequirementsId, {"Build.output", "Outputs"}); const IoHash TaskId = Task.GetHash(); Data.Objects[TaskId] = std::move(Task); Data.TaskId = TaskId; return true; } [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, std::set& InputFiles, std::map& InputFileHashes, std::map& Blobs) { const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); const IoHash ChunkId = FileEntry["hash"sv].AsHash(); const uint64_t Size = FileEntry["size"sv].AsUInt64(); IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); if (!DataBuffer) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); return false; } if (DataBuffer.Size() != Size) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", ChunkId, DataBuffer.Size(), Size); return false; } if (InputFiles.contains(FilePath)) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); return false; } InputFiles.insert(FilePath); InputFileHashes[FilePath] = ChunkId; Blobs[ChunkId] = std::move(DataBuffer); return true; } [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set& InputFiles) { static const std::filesystem::path RootPath; std::map AllDirectories; UpstreamDirectory RootDirectory = {.Path = RootPath}; AllDirectories[RootPath] = &RootDirectory; // Build tree from flat list for (const auto& Path : InputFiles) { if (Path.has_parent_path()) { if (!AllDirectories.contains(Path.parent_path())) { std::stack PathSplit; { std::filesystem::path ParentPath = Path.parent_path(); PathSplit.push(ParentPath.filename().string()); while (ParentPath.has_parent_path()) { ParentPath = ParentPath.parent_path(); PathSplit.push(ParentPath.filename().string()); } } UpstreamDirectory* ParentPtr = &RootDirectory; while (!PathSplit.empty()) { if (!ParentPtr->Directories.contains(PathSplit.top())) { std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; } ParentPtr = &ParentPtr->Directories[PathSplit.top()]; PathSplit.pop(); } } AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); } else { RootDirectory.Files.insert(Path.filename().string()); } } return RootDirectory; } [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, const std::map& InputFileHashes, const std::map& Blobs, std::map& Objects) { CbObjectWriter DirectoryTreeWriter; if (!RootDirectory.Files.empty()) { DirectoryTreeWriter.BeginArray("f"sv); for (const auto& File : RootDirectory.Files) { const std::filesystem::path FilePath = {RootDirectory.Path / File}; const IoHash& FileHash = InputFileHashes.at(FilePath); const uint64_t FileSize = Blobs.at(FileHash).Size(); DirectoryTreeWriter.BeginObject(); DirectoryTreeWriter.AddString("n"sv, File); DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded DirectoryTreeWriter.EndObject(); } DirectoryTreeWriter.EndArray(); } if (!RootDirectory.Directories.empty()) { DirectoryTreeWriter.BeginArray("d"sv); for (const auto& Item : RootDirectory.Directories) { CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); const IoHash DirectoryHash = Directory.GetHash(); Objects[DirectoryHash] = std::move(Directory); DirectoryTreeWriter.BeginObject(); DirectoryTreeWriter.AddString("n"sv, Item.first); DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); DirectoryTreeWriter.EndObject(); } DirectoryTreeWriter.EndArray(); } return std::move(DirectoryTreeWriter.Save()); } [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, const std::map& Resources, const bool Exclusive) { CbObjectWriter Writer; Writer.AddString("c", Condition); if (!Resources.empty()) { Writer.BeginArray("r"); for (const auto& Resource : Resources) { Writer.BeginArray(); Writer.AddString(Resource.first); Writer.AddInteger(Resource.second); Writer.EndArray(); } Writer.EndArray(); } Writer.AddBool("e", Exclusive); return std::move(Writer.Save()); } [[nodiscard]] CbObject BuildTask(const std::string_view Executable, const std::vector& Arguments, const std::map& Environment, const std::string_view WorkingDirectory, const IoHash& SandboxHash, const IoHash& RequirementsId, const std::set& Outputs) { CbObjectWriter TaskWriter; TaskWriter.AddString("e"sv, Executable); if (!Arguments.empty()) { TaskWriter.BeginArray("a"sv); for (const auto& Argument : Arguments) { TaskWriter.AddString(Argument); } TaskWriter.EndArray(); } if (!Environment.empty()) { TaskWriter.BeginArray("v"sv); for (const auto& Env : Environment) { TaskWriter.BeginArray(); TaskWriter.AddString(Env.first); TaskWriter.AddString(Env.second); TaskWriter.EndArray(); } TaskWriter.EndArray(); } if (!WorkingDirectory.empty()) { TaskWriter.AddString("s"sv, WorkingDirectory); } TaskWriter.AddObjectAttachment("s"sv, SandboxHash); TaskWriter.AddObjectAttachment("r"sv, RequirementsId); // Outputs if (!Outputs.empty()) { TaskWriter.BeginArray("o"sv); for (const auto& Output : Outputs) { TaskWriter.AddString(Output); } TaskWriter.EndArray(); } return std::move(TaskWriter.Save()); } }; } // namespace detail ////////////////////////////////////////////////////////////////////////// struct UpstreamApplyStats { static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} void Add(spdlog::logger& Logger, UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result, const std::vector>& Endpoints) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { Stats.ErrorCount++; } else if (Result.Success) { Stats.PostCount++; Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } } void Add(spdlog::logger& Logger, UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result, const std::vector>& Endpoints) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { Stats.ErrorCount++; } else if (Result.Success) { Stats.UpdateCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); if (!Result.Completed.empty()) { uint64_t Completed = 0; for (auto& It : Result.Completed) { Completed += It.second.size(); } Stats.CompleteCount.fetch_add(Completed); } } if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } } void Dump(spdlog::logger& Logger, const std::vector>& Endpoints) { for (auto& Ep : Endpoints) { // These stats will not be totally correct as the numbers are not captured atomically UpstreamApplyEndpointStats& Stats = Ep->Stats(); const uint64_t PostCount = Stats.PostCount; const uint64_t CompleteCount = Stats.CompleteCount; // const uint64_t UpdateCount = Stats.UpdateCount; const double DownBytes = Stats.DownBytes; const double SecondsDown = Stats.SecondsDown; const double UpBytes = Stats.UpBytes; const double SecondsUp = Stats.SecondsUp; const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", Ep->DisplayName(), CompleteRate, DownBytes, DownSpeed, UpBytes, UpSpeed); } } bool m_Enabled; std::atomic_uint64_t m_SampleCount = {}; }; ////////////////////////////////////////////////////////////////////////// class UpstreamApplyImpl final : public UpstreamApply { public: UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) : m_Log(logging::Get("upstream-apply")) , m_Options(Options) , m_CasStore(CasStore) , m_CidStore(CidStore) , m_Stats(Options.StatsEnabled) { } virtual ~UpstreamApplyImpl() { Shutdown(); } virtual bool Initialize() override { for (auto& Endpoint : m_Endpoints) { const UpstreamEndpointHealth Health = Endpoint->Initialize(); if (Health.Ok) { Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); } else { Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); } } m_RunState.IsRunning = !m_Endpoints.empty(); if (m_RunState.IsRunning) { m_ShutdownEvent.Reset(); for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&UpstreamApplyImpl::ProcessUpstreamQueue, this); } m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); } return m_RunState.IsRunning; } virtual void RegisterEndpoint(std::unique_ptr Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override { if (m_RunState.IsRunning) { const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); const IoHash ActionId = ApplyRecord.Action.GetHash(); const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { // Already in progress return {.ApplyId = ActionId, .Success = true}; } std::chrono::steady_clock::time_point ExpireTime = TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) : std::chrono::steady_clock::time_point::max(); m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; } if (!m_UpstreamThreads.empty()) { m_UpstreamQueue.Enqueue(std::move(ApplyRecord)); } else { ProcessApplyRecord(std::move(ApplyRecord)); } return {.ApplyId = ActionId, .Success = true}; } return {}; } virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override { if (m_RunState.IsRunning) { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { return {.Status = *Status, .Success = true}; } } return {}; } virtual void GetStatus(CbObjectWriter& Status) override { Status << "worker_threads" << m_Options.ThreadCount; Status << "queue_count" << m_UpstreamQueue.Size(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { Status.BeginObject(); Status << "name" << Ep->DisplayName(); Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamApplyEndpointStats& Stats = Ep->Stats(); const uint64_t PostCount = Stats.PostCount; const uint64_t CompleteCount = Stats.CompleteCount; // const uint64_t UpdateCount = Stats.UpdateCount; const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; Status << "post_count" << PostCount; Status << "complete_count" << PostCount; Status << "update_count" << Stats.UpdateCount; Status << "complete_ratio" << CompleteRate; Status << "downloaded_mb" << Stats.DownBytes; Status << "uploaded_mb" << Stats.UpBytes; Status << "error_count" << Stats.ErrorCount; Status.EndObject(); } Status.EndArray(); } private: // The caller is responsible for locking if required UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) { if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) { if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) { return &It2->second; } } return nullptr; } void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) { const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); const IoHash ActionId = ApplyRecord.Action.GetHash(); try { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { if (Result.Success) { Status->State = UpstreamApplyState::Executing; } else { Status->State = UpstreamApplyState::Complete; Status->Result = {.Error = std::move(Result.Error), .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds}; } } } m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); return; } } { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { Status->State = UpstreamApplyState::Complete; Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; } Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); } } catch (std::exception& e) { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { Status->State = UpstreamApplyState::Complete; Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; } Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); } } void ProcessUpstreamQueue() { for (;;) { UpstreamApplyRecord ApplyRecord; if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord)) { ProcessApplyRecord(std::move(ApplyRecord)); } if (!m_RunState.IsRunning) { break; } } } void ProcessApplyUpdates() { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (!Result.Success) { Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); } if (!Result.Completed.empty()) { for (auto& It : Result.Completed) { for (auto& It2 : It.second) { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) { Status->State = UpstreamApplyState::Complete; Status->Result = std::move(It2.second); } } } } } } } void ProcessUpstreamUpdates() { const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval); while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count()))) { if (!m_RunState.IsRunning) { break; } ProcessApplyUpdates(); // Remove any expired tasks, regardless of state { std::scoped_lock Lock(m_ApplyTasksMutex); for (auto& WorkerIt : m_ApplyTasks) { const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { return Item.second.ExpireTime < std::chrono::steady_clock::now(); }); if (Count > 0) { Log().debug("Removed '{}' expired tasks", Count); } } const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); if (Count > 0) { Log().debug("Removed '{}' empty task lists", Count); } } } } void MonitorEndpoints() { for (;;) { { std::unique_lock Lock(m_RunState.Mutex); if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) { break; } } for (auto& Endpoint : m_Endpoints) { if (!Endpoint->IsHealthy()) { if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) { Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); } else { Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); } } } } } void Shutdown() { if (m_RunState.Stop()) { m_ShutdownEvent.Set(); m_UpstreamQueue.CompleteAdding(); for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } m_EndpointMonitorThread.join(); m_UpstreamUpdatesThread.join(); m_UpstreamThreads.clear(); m_Endpoints.clear(); } } spdlog::logger& Log() { return m_Log; } using UpstreamApplyQueue = BlockingQueue; struct RunState { std::mutex Mutex; std::condition_variable ExitSignal; std::atomic_bool IsRunning{false}; bool Stop() { bool Stopped = false; { std::scoped_lock Lock(Mutex); Stopped = IsRunning.exchange(false); } if (Stopped) { ExitSignal.notify_all(); } return Stopped; } }; spdlog::logger& m_Log; UpstreamApplyOptions m_Options; CasStore& m_CasStore; CidStore& m_CidStore; UpstreamApplyQueue m_UpstreamQueue; UpstreamApplyStats m_Stats; UpstreamApplyTasks m_ApplyTasks; std::mutex m_ApplyTasksMutex; std::vector> m_Endpoints; Event m_ShutdownEvent; std::vector m_UpstreamThreads; std::thread m_UpstreamUpdatesThread; std::thread m_EndpointMonitorThread; RunState m_RunState; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) { return std::make_unique(Options, CasStore, CidStore); } std::unique_ptr UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr) { return std::make_unique(ComputeOptions, ComputeAuthConfig, StorageOptions, StorageAuthConfig, CasStore, CidStore, Mgr); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES