// Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamapply.h" #if ZEN_WITH_COMPUTE_SERVICES # include "jupiter.h" # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include "cache/structuredcachestore.h" # include "diag/logging.h" # include # include # include # include # include namespace zen { using namespace std::literals; static const IoBuffer EmptyBuffer; static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); namespace detail { class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint { public: HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) , m_CasStore(CasStore) , m_CidStore(CidStore) , m_AuthMgr(Mgr) { m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); { std::unique_ptr TokenProvider; if (ComputeAuthConfig.OAuthUrl.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, .ClientId = ComputeAuthConfig.OAuthClientId, .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); } else if (ComputeAuthConfig.OpenIdProvider.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; }); } else { CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); } m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); } { std::unique_ptr TokenProvider; if (StorageAuthConfig.OAuthUrl.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, .ClientId = StorageAuthConfig.OAuthClientId, .ClientSecret = StorageAuthConfig.OAuthClientSecret}); } else if (StorageAuthConfig.OpenIdProvider.empty() == false) { TokenProvider = CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; }); } else { CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); } m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); } } virtual ~HordeUpstreamApplyEndpoint() = default; virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } virtual bool IsHealthy() const override { return m_HealthOk.load(); } virtual UpstreamEndpointHealth CheckHealth() override { try { CloudCacheSession Session(m_Client); CloudCacheResult Result = Session.Authenticate(); m_HealthOk = Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; } catch (std::exception& Err) { return {.Reason = Err.what(), .Ok = false}; } } virtual std::string_view DisplayName() const override { return m_DisplayName; } virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override { PostUpstreamApplyResult ApplyResult{}; ApplyResult.Timepoints.merge(ApplyRecord.Timepoints); try { UpstreamData UpstreamData; if (!ProcessApplyKey(ApplyRecord, UpstreamData)) { return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; } { ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks(); std::scoped_lock Lock(m_TaskMutex); if (m_PendingTasks.contains(UpstreamData.TaskId)) { // Pending task is already queued, return success ApplyResult.Success = true; return ApplyResult; } m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord); } CloudCacheSession ComputeSession(m_Client); CloudCacheSession StorageSession(m_StorageClient); { CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); ApplyResult.Bytes += Result.Bytes; ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks(); if (!Result.Success) { ApplyResult.Error = {.ErrorCode = Result.ErrorCode, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}; return ApplyResult; } UpstreamData.Blobs.clear(); } { CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); ApplyResult.Bytes += Result.Bytes; ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; ApplyResult.Timepoints["zen-storage-upload-objects"] = DateTime::NowTicks(); if (!Result.Success) { ApplyResult.Error = {.ErrorCode = Result.ErrorCode, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}; return ApplyResult; } } { PutRefResult RefResult = StorageSession.PutRef("requests"sv, UpstreamData.TaskId, UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), ZenContentType::kCbObject); Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}", UpstreamData.TaskId, RefResult.Needs.size(), RefResult.Bytes, RefResult.ElapsedSeconds, RefResult.Success); ApplyResult.Bytes += RefResult.Bytes; ApplyResult.ElapsedSeconds += RefResult.ElapsedSeconds; ApplyResult.Timepoints["zen-storage-put-ref"] = DateTime::NowTicks(); if (RefResult.Needs.size() > 0) { Log().error("Failed to add task ref {} due to {} missing blobs", UpstreamData.TaskId, RefResult.Needs.size()); for (const auto& Hash : RefResult.Needs) { Log().debug("Task ref {} missing blob {}", UpstreamData.TaskId, Hash); } ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref due to missing blob"}; return ApplyResult; } if (!RefResult.Success) { ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"}; return ApplyResult; } UpstreamData.Objects.clear(); } { CbObjectWriter Writer; Writer.AddString("c"sv, m_ChannelId); Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); Writer.BeginArray("t"sv); Writer.AddObjectAttachment(UpstreamData.TaskId); Writer.EndArray(); CbObject TasksObject = Writer.Save(); IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer(); CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); Log().debug("Post compute task {} Bytes={} Duration={}s Result={}", TasksObject.GetHash(), Result.Bytes, Result.ElapsedSeconds, Result.Success); ApplyResult.Bytes += Result.Bytes; ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; ApplyResult.Timepoints["zen-horde-post-task"] = DateTime::NowTicks(); if (!Result.Success) { { std::scoped_lock Lock(m_TaskMutex); m_PendingTasks.erase(UpstreamData.TaskId); } ApplyResult.Error = {.ErrorCode = Result.ErrorCode, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}; return ApplyResult; } } Log().info("Task posted {}", UpstreamData.TaskId); ApplyResult.Success = true; return ApplyResult; } catch (std::exception& Err) { m_HealthOk = false; return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map& Blobs) { if (Blobs.size() == 0) { return {.Success = true}; } int64_t Bytes{}; double ElapsedSeconds{}; // Batch check for missing blobs std::set Keys; std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), ExistsResult.ElapsedSeconds, ExistsResult.Success); ElapsedSeconds += ExistsResult.ElapsedSeconds; if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; } for (const auto& Hash : ExistsResult.Needs) { CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash)); Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; } } return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map& Objects) { if (Objects.size() == 0) { return {.Success = true}; } int64_t Bytes{}; double ElapsedSeconds{}; // Batch check for missing objects std::set Keys; std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), ExistsResult.ElapsedSeconds, ExistsResult.Success); ElapsedSeconds += ExistsResult.ElapsedSeconds; if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; } for (const auto& Hash : ExistsResult.Needs) { CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; } } return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } enum class ComputeTaskState : int32_t { Queued = 0, Executing = 1, Complete = 2, }; enum class ComputeTaskOutcome : int32_t { Success = 0, Failed = 1, Cancelled = 2, NoResult = 3, Exipred = 4, BlobNotFound = 5, Exception = 6, }; [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome) { switch (Outcome) { case ComputeTaskState::Queued: return "Queued"sv; case ComputeTaskState::Executing: return "Executing"sv; case ComputeTaskState::Complete: return "Complete"sv; }; return "Unknown"sv; } [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) { switch (Outcome) { case ComputeTaskOutcome::Success: return "Success"sv; case ComputeTaskOutcome::Failed: return "Failed"sv; case ComputeTaskOutcome::Cancelled: return "Cancelled"sv; case ComputeTaskOutcome::NoResult: return "NoResult"sv; case ComputeTaskOutcome::Exipred: return "Exipred"sv; case ComputeTaskOutcome::BlobNotFound: return "BlobNotFound"sv; case ComputeTaskOutcome::Exception: return "Exception"sv; }; return "Unknown"sv; } virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override { int64_t Bytes{}; double ElapsedSeconds{}; { std::scoped_lock Lock(m_TaskMutex); if (m_PendingTasks.empty()) { if (m_CompletedTasks.empty()) { // Nothing to do. return {.Success = true}; } UpstreamApplyCompleted CompletedTasks; std::swap(CompletedTasks, m_CompletedTasks); return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; } } try { CloudCacheSession ComputeSession(m_Client); CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); Log().debug("Get compute updates Bytes={} Duration={}s Result={}", UpdatesResult.Bytes, UpdatesResult.ElapsedSeconds, UpdatesResult.Success); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; if (!UpdatesResult.Success) { return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } if (!UpdatesResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response)); for (auto& It : TaskStatus["u"sv]) { CbObjectView Status = It.AsObjectView(); IoHash TaskId = Status["h"sv].AsHash(); const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32(); Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State)); // Only completed tasks need to be processed if (State != ComputeTaskState::Complete) { continue; } IoHash WorkerId{}; IoHash ActionId{}; UpstreamApplyType ApplyType{}; { std::scoped_lock Lock(m_TaskMutex); auto TaskIt = m_PendingTasks.find(TaskId); if (TaskIt != m_PendingTasks.end()) { WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); ActionId = TaskIt->second.Action.GetHash(); ApplyType = TaskIt->second.Type; m_PendingTasks.erase(TaskIt); } } if (WorkerId == IoHash::Zero) { Log().warn("Task {} missing from pending tasks", TaskId); continue; } std::map Timepoints; ProcessQueueTimings(Status["qs"sv].AsObjectView(), Timepoints); ProcessExecuteTimings(Status["es"sv].AsObjectView(), Timepoints); if (Outcome != ComputeTaskOutcome::Success) { const std::string_view Detail = Status["d"sv].AsString(); { std::scoped_lock Lock(m_TaskMutex); m_CompletedTasks[WorkerId][ActionId] = { .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}, .Timepoints = std::move(Timepoints)}; } continue; } Timepoints["zen-complete-queue-added"] = DateTime::NowTicks(); ThreadPool.ScheduleWork([this, ApplyType, ResultHash = Status["r"sv].AsHash(), Timepoints = std::move(Timepoints), TaskId = std::move(TaskId), WorkerId = std::move(WorkerId), ActionId = std::move(ActionId)]() mutable { Timepoints["zen-complete-queue-dispatched"] = DateTime::NowTicks(); GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash); Timepoints["zen-complete-queue-complete"] = DateTime::NowTicks(); Result.Timepoints.merge(Timepoints); Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}", TaskId, Result.OutputFiles.size(), Result.OutputPackage.GetAttachments().size(), Result.Error.ErrorCode); { std::scoped_lock Lock(m_TaskMutex); m_CompletedTasks[WorkerId][ActionId] = std::move(Result); } }); } { std::scoped_lock Lock(m_TaskMutex); if (m_CompletedTasks.empty()) { // Nothing to do. return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } UpstreamApplyCompleted CompletedTasks; std::swap(CompletedTasks, m_CompletedTasks); return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; } } catch (std::exception& Err) { m_HealthOk = false; return { .Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, }; } } virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } private: spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; CasStore& m_CasStore; CidStore& m_CidStore; AuthMgr& m_AuthMgr; std::string m_DisplayName; RefPtr m_Client; RefPtr m_StorageClient; UpstreamApplyEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; std::string m_ChannelId; std::mutex m_TaskMutex; std::unordered_map m_PendingTasks; UpstreamApplyCompleted m_CompletedTasks; struct UpstreamData { std::map Blobs; std::map Objects; IoHash TaskId; IoHash RequirementsId; }; struct UpstreamDirectory { std::filesystem::path Path; std::map Directories; std::set Files; }; static void ProcessQueueTimings(CbObjectView QueueStats, std::map& Timepoints) { uint64_t Ticks = QueueStats["t"sv].AsDateTimeTicks(); if (Ticks == 0) { return; } // Scope is an array of miliseconds after start time // TODO: cleanup Timepoints["horde-queue-added"] = Ticks; int Index = 0; for (auto& Item : QueueStats["s"sv].AsArrayView()) { Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; switch (Index) { case 0: Timepoints["horde-queue-dispatched"] = Ticks; break; case 1: Timepoints["horde-queue-complete"] = Ticks; break; } Index++; } } static void ProcessExecuteTimings(CbObjectView ExecutionStats, std::map& Timepoints) { uint64_t Ticks = ExecutionStats["t"sv].AsDateTimeTicks(); if (Ticks == 0) { return; } // Scope is an array of miliseconds after start time // TODO: cleanup Timepoints["horde-execution-start"] = Ticks; int Index = 0; for (auto& Item : ExecutionStats["s"sv].AsArrayView()) { Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; switch (Index) { case 0: Timepoints["horde-execution-download-ref"] = Ticks; break; case 1: Timepoints["horde-execution-download-input"] = Ticks; break; case 2: Timepoints["horde-execution-execute"] = Ticks; break; case 3: Timepoints["horde-execution-upload-log"] = Ticks; break; case 4: Timepoints["horde-execution-upload-output"] = Ticks; break; case 5: Timepoints["horde-execution-upload-ref"] = Ticks; break; } Index++; } } [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash) { try { CloudCacheSession Session(m_StorageClient); GetUpstreamApplyResult ApplyResult{}; IoHash StdOutHash; IoHash StdErrHash; IoHash OutputHash; std::map BinaryData; { CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); Log().debug("Get ref {} Bytes={} Duration={}s Result={}", ResultHash, ObjectRefResult.Bytes, ObjectRefResult.ElapsedSeconds, ObjectRefResult.Success); ApplyResult.Bytes += ObjectRefResult.Bytes; ApplyResult.ElapsedSeconds += ObjectRefResult.ElapsedSeconds; ApplyResult.Timepoints["zen-storage-get-ref"] = DateTime::NowTicks(); if (!ObjectRefResult.Success) { ApplyResult.Error.Reason = "Failed to get result object data"; return ApplyResult; } CbObject ResultObject = LoadCompactBinaryObject(ObjectRefResult.Response); ApplyResult.Error.ErrorCode = ResultObject["e"sv].AsInt32(); StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); OutputHash = ResultObject["o"sv].AsObjectAttachment(); } { std::set NeededData; if (OutputHash != IoHash::Zero) { GetObjectReferencesResult ObjectReferenceResult = Session.GetObjectReferences(OutputHash); Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}", ResultHash, ObjectReferenceResult.References.size(), ObjectReferenceResult.Bytes, ObjectReferenceResult.ElapsedSeconds, ObjectReferenceResult.Success); ApplyResult.Bytes += ObjectReferenceResult.Bytes; ApplyResult.ElapsedSeconds += ObjectReferenceResult.ElapsedSeconds; ApplyResult.Timepoints["zen-storage-get-object-references"] = DateTime::NowTicks(); if (!ObjectReferenceResult.Success) { ApplyResult.Error.Reason = "Failed to get result object references"; return ApplyResult; } NeededData = std::move(ObjectReferenceResult.References); } NeededData.insert(OutputHash); NeededData.insert(StdOutHash); NeededData.insert(StdErrHash); for (const auto& Hash : NeededData) { if (Hash == IoHash::Zero) { continue; } CloudCacheResult BlobResult = Session.GetBlob(Hash); Log().debug("Get blob {} Bytes={} Duration={}s Result={}", Hash, BlobResult.Bytes, BlobResult.ElapsedSeconds, BlobResult.Success); ApplyResult.Bytes += BlobResult.Bytes; ApplyResult.ElapsedSeconds += BlobResult.ElapsedSeconds; if (!BlobResult.Success) { ApplyResult.Error.Reason = "Failed to get blob"; return ApplyResult; } BinaryData[Hash] = std::move(BlobResult.Response); } ApplyResult.Timepoints["zen-storage-get-blobs"] = DateTime::NowTicks(); } ApplyResult.StdOut = StdOutHash != IoHash::Zero ? std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()) : ""; ApplyResult.StdErr = StdErrHash != IoHash::Zero ? std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()) : ""; if (OutputHash == IoHash::Zero) { ApplyResult.Error.Reason = "Task completed with no output object"; return ApplyResult; } CbObject OutputObject = LoadCompactBinaryObject(BinaryData[OutputHash]); switch (ApplyType) { case UpstreamApplyType::Simple: { ResolveMerkleTreeDirectory(""sv, OutputHash, BinaryData, ApplyResult.OutputFiles); for (const auto& Pair : BinaryData) { ApplyResult.FileData[Pair.first] = std::move(BinaryData.at(Pair.first)); } ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; return ApplyResult; } break; case UpstreamApplyType::Asset: { if (ApplyResult.Error.ErrorCode != 0) { ApplyResult.Error.Reason = "Task completed with errors"; return ApplyResult; } // Get build.output IoHash BuildOutputId; IoBuffer BuildOutput; for (auto& It : OutputObject["f"sv]) { const CbObjectView FileObject = It.AsObjectView(); if (FileObject["n"sv].AsString() == "Build.output"sv) { BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); BuildOutput = BinaryData[BuildOutputId]; break; } } if (BuildOutput.GetSize() == 0) { ApplyResult.Error.Reason = "Build.output file not found in task results"; return ApplyResult; } // Get Output directory node IoBuffer OutputDirectoryTree; for (auto& It : OutputObject["d"sv]) { const CbObjectView DirectoryObject = It.AsObjectView(); if (DirectoryObject["n"sv].AsString() == "Outputs"sv) { OutputDirectoryTree = BinaryData[DirectoryObject["h"sv].AsObjectAttachment()]; break; } } if (OutputDirectoryTree.GetSize() == 0) { ApplyResult.Error.Reason = "Outputs directory not found in task results"; return ApplyResult; } // load build.output as CbObject // Move Outputs from Horde to CbPackage std::unordered_map CidToCompressedId; CbPackage OutputPackage; CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); for (auto& It : OutputDirectoryTreeObject["f"sv]) { CbObjectView FileObject = It.AsObjectView(); // Name is the uncompressed hash IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); // Hash is the compressed data hash, and how it is stored in Horde IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); if (!BinaryData.contains(CompressedId)) { Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId); ApplyResult.Error.Reason = "Object attachment chunk not retrieved from Horde"; return ApplyResult; } CidToCompressedId[DecompressedId] = CompressedId; } // Iterate attachments, verify all chunks exist, and add to CbPackage bool AnyErrors = false; CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); BuildOutputObject.IterateAttachments([&](CbFieldView Field) { const IoHash DecompressedId = Field.AsHash(); if (!CidToCompressedId.contains(DecompressedId)) { Log().warn("Attachment not found {}", DecompressedId); AnyErrors = true; return; } const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); if (!BinaryData.contains(CompressedId)) { Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId); AnyErrors = true; return; } CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); if (!AttachmentBuffer) { Log().warn( "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", CompressedId, DecompressedId); AnyErrors = true; return; } ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); ApplyResult.TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); CbAttachment Attachment(AttachmentBuffer); OutputPackage.AddAttachment(Attachment); }); if (AnyErrors) { ApplyResult.Error.Reason = "Failed to get result object attachment data"; return ApplyResult; } OutputPackage.SetObject(BuildOutputObject); ApplyResult.OutputPackage = std::move(OutputPackage); ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; return ApplyResult; } break; } ApplyResult.Error.Reason = "Unknown apply type"; return ApplyResult; } catch (std::exception& Err) { return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) { std::string ExecutablePath; std::string WorkingDirectory; std::vector Arguments; std::map Environment; std::set InputFiles; std::set Outputs; std::map InputFileHashes; ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); if (ExecutablePath.empty()) { Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", ApplyRecord.WorkerDescriptor.GetHash()); return false; } WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString(); for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) { CbObjectView FileEntry = It.AsObjectView(); if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) { return false; } } for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) { CbObjectView FileEntry = It.AsObjectView(); if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) { return false; } } for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) { std::string_view Directory = It.AsString(); std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory); InputFiles.insert(DummyFile); Data.Blobs[EmptyBufferId] = EmptyBuffer; InputFileHashes[DummyFile] = EmptyBufferId; } if (!WorkingDirectory.empty()) { std::string DummyFile = fmt::format("{}/.zen_empty_file", WorkingDirectory); InputFiles.insert(DummyFile); Data.Blobs[EmptyBufferId] = EmptyBuffer; InputFileHashes[DummyFile] = EmptyBufferId; } for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) { std::string_view Env = It.AsString(); auto Index = Env.find('='); if (Index == std::string_view::npos) { Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); return false; } Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); } switch (ApplyRecord.Type) { case UpstreamApplyType::Simple: { for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv]) { Arguments.push_back(std::string(It.AsString())); } for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv]) { Outputs.insert(std::string(It.AsString())); } } break; case UpstreamApplyType::Asset: { static const std::filesystem::path BuildActionPath = "Build.action"sv; static const std::filesystem::path InputPath = "Inputs"sv; const IoHash ActionId = ApplyRecord.Action.GetHash(); Arguments.push_back("-Build=build.action"); Outputs.insert("Build.output"); Outputs.insert("Outputs"); InputFiles.insert(BuildActionPath); InputFileHashes[BuildActionPath] = ActionId; Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), ApplyRecord.Action.GetBuffer().GetSize()); bool AnyErrors = false; ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { const IoHash Cid = Field.AsHash(); const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); if (!DataBuffer) { Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); AnyErrors = true; return; } if (InputFiles.contains(FilePath)) { return; } const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); InputFiles.insert(FilePath); InputFileHashes[FilePath] = CompressedId; Data.Blobs[CompressedId] = std::move(DataBuffer); }); if (AnyErrors) { return false; } } break; } const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); const IoHash SandboxHash = Sandbox.GetHash(); Data.Objects[SandboxHash] = std::move(Sandbox); { std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); if (HostPlatform.empty()) { Log().warn("process apply upstream FAILED, 'host' platform not provided"); return false; } int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32(); int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); std::string Condition = fmt::format("Platform == '{}'", HostPlatform); if (HostPlatform == "Win64") { // TODO // Condition += " && Pool == 'Win-RemoteExec'"; } std::map Resources; if (LogicalCores > 0) { Resources["LogicalCores"sv] = LogicalCores; } if (Memory > 0) { Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL); } CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); const IoHash RequirementsId = Requirements.GetHash(); Data.Objects[RequirementsId] = std::move(Requirements); Data.RequirementsId = RequirementsId; } CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs); const IoHash TaskId = Task.GetHash(); Data.Objects[TaskId] = std::move(Task); Data.TaskId = TaskId; return true; } [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, std::set& InputFiles, std::map& InputFileHashes, std::map& Blobs) { const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); const IoHash ChunkId = FileEntry["hash"sv].AsHash(); const uint64_t Size = FileEntry["size"sv].AsUInt64(); IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); if (!DataBuffer) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); return false; } if (DataBuffer.Size() != Size) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", ChunkId, DataBuffer.Size(), Size); return false; } if (InputFiles.contains(FilePath)) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); return false; } InputFiles.insert(FilePath); InputFileHashes[FilePath] = ChunkId; Blobs[ChunkId] = std::move(DataBuffer); return true; } [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set& InputFiles) { static const std::filesystem::path RootPath; std::map AllDirectories; UpstreamDirectory RootDirectory = {.Path = RootPath}; AllDirectories[RootPath] = &RootDirectory; // Build tree from flat list for (const auto& Path : InputFiles) { if (Path.has_parent_path()) { if (!AllDirectories.contains(Path.parent_path())) { std::stack PathSplit; { std::filesystem::path ParentPath = Path.parent_path(); PathSplit.push(ParentPath.filename().string()); while (ParentPath.has_parent_path()) { ParentPath = ParentPath.parent_path(); PathSplit.push(ParentPath.filename().string()); } } UpstreamDirectory* ParentPtr = &RootDirectory; while (!PathSplit.empty()) { if (!ParentPtr->Directories.contains(PathSplit.top())) { std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; } ParentPtr = &ParentPtr->Directories[PathSplit.top()]; PathSplit.pop(); } } AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); } else { RootDirectory.Files.insert(Path.filename().string()); } } return RootDirectory; } [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, const std::map& InputFileHashes, const std::map& Blobs, std::map& Objects) { CbObjectWriter DirectoryTreeWriter; if (!RootDirectory.Files.empty()) { DirectoryTreeWriter.BeginArray("f"sv); for (const auto& File : RootDirectory.Files) { const std::filesystem::path FilePath = {RootDirectory.Path / File}; const IoHash& FileHash = InputFileHashes.at(FilePath); const uint64_t FileSize = Blobs.at(FileHash).Size(); DirectoryTreeWriter.BeginObject(); DirectoryTreeWriter.AddString("n"sv, File); DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded DirectoryTreeWriter.EndObject(); } DirectoryTreeWriter.EndArray(); } if (!RootDirectory.Directories.empty()) { DirectoryTreeWriter.BeginArray("d"sv); for (const auto& Item : RootDirectory.Directories) { CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); const IoHash DirectoryHash = Directory.GetHash(); Objects[DirectoryHash] = std::move(Directory); DirectoryTreeWriter.BeginObject(); DirectoryTreeWriter.AddString("n"sv, Item.first); DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); DirectoryTreeWriter.EndObject(); } DirectoryTreeWriter.EndArray(); } return DirectoryTreeWriter.Save(); } void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory, const IoHash& DirectoryHash, const std::map& Objects, std::map& OutputFiles) { CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash)); for (auto& It : Directory["f"sv]) { const CbObjectView FileObject = It.AsObjectView(); const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString(); OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment(); } for (auto& It : Directory["d"sv]) { const CbObjectView DirectoryObject = It.AsObjectView(); ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(), DirectoryObject["h"sv].AsObjectAttachment(), Objects, OutputFiles); } } [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, const std::map& Resources, const bool Exclusive) { CbObjectWriter Writer; Writer.AddString("c", Condition); if (!Resources.empty()) { Writer.BeginArray("r"); for (const auto& Resource : Resources) { Writer.BeginArray(); Writer.AddString(Resource.first); Writer.AddInteger(Resource.second); Writer.EndArray(); } Writer.EndArray(); } Writer.AddBool("e", Exclusive); return Writer.Save(); } [[nodiscard]] CbObject BuildTask(const std::string_view Executable, const std::vector& Arguments, const std::map& Environment, const std::string_view WorkingDirectory, const IoHash& SandboxHash, const IoHash& RequirementsId, const std::set& Outputs) { CbObjectWriter TaskWriter; TaskWriter.AddString("e"sv, Executable); if (!Arguments.empty()) { TaskWriter.BeginArray("a"sv); for (const auto& Argument : Arguments) { TaskWriter.AddString(Argument); } TaskWriter.EndArray(); } if (!Environment.empty()) { TaskWriter.BeginArray("v"sv); for (const auto& Env : Environment) { TaskWriter.BeginArray(); TaskWriter.AddString(Env.first); TaskWriter.AddString(Env.second); TaskWriter.EndArray(); } TaskWriter.EndArray(); } if (!WorkingDirectory.empty()) { TaskWriter.AddString("w"sv, WorkingDirectory); } TaskWriter.AddObjectAttachment("s"sv, SandboxHash); TaskWriter.AddObjectAttachment("r"sv, RequirementsId); // Outputs if (!Outputs.empty()) { TaskWriter.BeginArray("o"sv); for (const auto& Output : Outputs) { TaskWriter.AddString(Output); } TaskWriter.EndArray(); } return TaskWriter.Save(); } }; } // namespace detail ////////////////////////////////////////////////////////////////////////// std::unique_ptr UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr) { return std::make_unique(ComputeOptions, ComputeAuthConfig, StorageOptions, StorageAuthConfig, CasStore, CidStore, Mgr); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES