diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/hordecompute.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/upstream/hordecompute.cpp')
| -rw-r--r-- | src/zenserver/upstream/hordecompute.cpp | 1457 |
1 files changed, 1457 insertions, 0 deletions
diff --git a/src/zenserver/upstream/hordecompute.cpp b/src/zenserver/upstream/hordecompute.cpp new file mode 100644 index 000000000..64d9fff72 --- /dev/null +++ b/src/zenserver/upstream/hordecompute.cpp @@ -0,0 +1,1457 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamapply.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "jupiter.h" + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compactbinaryvalidation.h> +# include <zencore/fmtutils.h> +# include <zencore/session.h> +# include <zencore/stream.h> +# include <zencore/thread.h> +# include <zencore/timer.h> +# include <zencore/workthreadpool.h> + +# include <zenstore/cidstore.h> + +# include <auth/authmgr.h> +# include <upstream/upstreamcache.h> + +# include "cache/structuredcachestore.h" +# include "diag/logging.h" + +# include <fmt/format.h> + +# include <algorithm> +# include <atomic> +# include <set> +# include <stack> + +namespace zen { + +using namespace std::literals; + +static const IoBuffer EmptyBuffer; +static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); + +namespace detail { + + class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint + { + public: + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CidStore& CidStore, + AuthMgr& Mgr) + : m_Log(logging::Get("upstream-apply")) + , m_CidStore(CidStore) + , m_AuthMgr(Mgr) + { + m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); + m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (ComputeAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, + .ClientId = ComputeAuthConfig.OAuthClientId, + .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); + } + else if (ComputeAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); + } + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (StorageAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, + .ClientId = StorageAuthConfig.OAuthClientId, + .ClientSecret = StorageAuthConfig.OAuthClientSecret}); + } + else if (StorageAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); + } + } + + virtual ~HordeUpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } + + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override + { + try + { + CloudCacheSession Session(m_Client); + CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } + } + + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override + { + PostUpstreamApplyResult ApplyResult{}; + ApplyResult.Timepoints.merge(ApplyRecord.Timepoints); + + try + { + UpstreamData UpstreamData; + if (!ProcessApplyKey(ApplyRecord, UpstreamData)) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; + } + + { + ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks(); + + bool AlreadyQueued; + { + std::scoped_lock Lock(m_TaskMutex); + AlreadyQueued = m_PendingTasks.contains(UpstreamData.TaskId); + } + if (AlreadyQueued) + { + // Pending task is already queued, return success + ApplyResult.Success = true; + return ApplyResult; + } + m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord); + } + + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); + + { + CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs, UpstreamData.CasIds); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks(); + if (!Result.Success) + { + ApplyResult.Error = {.ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}; + return ApplyResult; + } + UpstreamData.Blobs.clear(); + UpstreamData.CasIds.clear(); + } + + { + CloudCacheResult Result = BatchPutCompressedBlobsIfMissing(StorageSession, UpstreamData.Cids); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-upload-compressed-blobs"] = DateTime::NowTicks(); + if (!Result.Success) + { + ApplyResult.Error = { + .ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload compressed blobs"}; + return ApplyResult; + } + UpstreamData.Cids.clear(); + } + + { + CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-upload-objects"] = DateTime::NowTicks(); + if (!Result.Success) + { + ApplyResult.Error = {.ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}; + return ApplyResult; + } + } + + { + PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().DefaultBlobStoreNamespace(), + "requests"sv, + UpstreamData.TaskId, + UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), + ZenContentType::kCbObject); + Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}", + UpstreamData.TaskId, + RefResult.Needs.size(), + RefResult.Bytes, + RefResult.ElapsedSeconds, + RefResult.Success); + ApplyResult.Bytes += RefResult.Bytes; + ApplyResult.ElapsedSeconds += RefResult.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-put-ref"] = DateTime::NowTicks(); + + if (RefResult.Needs.size() > 0) + { + Log().error("Failed to add task ref {} due to {} missing blobs", UpstreamData.TaskId, RefResult.Needs.size()); + for (const auto& Hash : RefResult.Needs) + { + Log().debug("Task ref {} missing blob {}", UpstreamData.TaskId, Hash); + } + + ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, + .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) + : "Failed to add task ref due to missing blob"}; + return ApplyResult; + } + + if (!RefResult.Success) + { + ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, + .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"}; + return ApplyResult; + } + UpstreamData.Objects.clear(); + } + + { + CbObjectWriter Writer; + Writer.AddString("c"sv, m_ChannelId); + Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); + Writer.BeginArray("t"sv); + Writer.AddObjectAttachment(UpstreamData.TaskId); + Writer.EndArray(); + CbObject TasksObject = Writer.Save(); + IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer(); + + CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); + Log().debug("Post compute task {} Bytes={} Duration={}s Result={}", + TasksObject.GetHash(), + Result.Bytes, + Result.ElapsedSeconds, + Result.Success); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-horde-post-task"] = DateTime::NowTicks(); + if (!Result.Success) + { + { + std::scoped_lock Lock(m_TaskMutex); + m_PendingTasks.erase(UpstreamData.TaskId); + } + + ApplyResult.Error = {.ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}; + return ApplyResult; + } + } + + Log().info("Task posted {}", UpstreamData.TaskId); + ApplyResult.Success = true; + return ApplyResult; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + } + } + + [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, + const std::map<IoHash, IoBuffer>& Blobs, + const std::set<IoHash>& CasIds) + { + if (Blobs.size() == 0 && CasIds.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing blobs + std::set<IoHash> Keys; + std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); + Keys.insert(CasIds.begin(), CasIds.end()); + + CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys); + Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", + Keys.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (!ExistsResult.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; + } + + for (const auto& Hash : ExistsResult.Needs) + { + IoBuffer DataBuffer; + if (Blobs.contains(Hash)) + { + DataBuffer = Blobs.at(Hash); + } + else + { + DataBuffer = m_CidStore.FindChunkByCid(Hash); + if (!DataBuffer) + { + Log().warn("Put blob FAILED, input chunk '{}' missing", Hash); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put blobs"}; + } + } + + CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); + Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + [[nodiscard]] CloudCacheResult BatchPutCompressedBlobsIfMissing(CloudCacheSession& Session, const std::set<IoHash>& Cids) + { + if (Cids.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing compressed blobs + CloudCacheExistsResult ExistsResult = Session.CompressedBlobExists(Session.Client().DefaultBlobStoreNamespace(), Cids); + Log().debug("Queried {} missing compressed blobs Need={} Duration={}s Result={}", + Cids.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (!ExistsResult.Success) + { + return { + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if compressed blobs exist"}; + } + + for (const auto& Hash : ExistsResult.Needs) + { + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Hash); + if (!DataBuffer) + { + Log().warn("Put compressed blob FAILED, input CID chunk '{}' missing", Hash); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put compressed blobs"}; + } + + CloudCacheResult Result = Session.PutCompressedBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); + Log().debug("Put compressed blob {} Bytes={} Duration={}s Result={}", + Hash, + Result.Bytes, + Result.ElapsedSeconds, + Result.Success); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put compressed blobs"}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) + { + if (Objects.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing objects + std::set<IoHash> Keys; + std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); + + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().DefaultBlobStoreNamespace(), Keys); + Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", + Keys.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (!ExistsResult.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; + } + + for (const auto& Hash : ExistsResult.Needs) + { + CloudCacheResult Result = + Session.PutObject(Session.Client().DefaultBlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); + Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + enum class ComputeTaskState : int32_t + { + Queued = 0, + Executing = 1, + Complete = 2, + }; + + enum class ComputeTaskOutcome : int32_t + { + Success = 0, + Failed = 1, + Cancelled = 2, + NoResult = 3, + Exipred = 4, + BlobNotFound = 5, + Exception = 6, + }; + + [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome) + { + switch (Outcome) + { + case ComputeTaskState::Queued: + return "Queued"sv; + case ComputeTaskState::Executing: + return "Executing"sv; + case ComputeTaskState::Complete: + return "Complete"sv; + }; + return "Unknown"sv; + } + + [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) + { + switch (Outcome) + { + case ComputeTaskOutcome::Success: + return "Success"sv; + case ComputeTaskOutcome::Failed: + return "Failed"sv; + case ComputeTaskOutcome::Cancelled: + return "Cancelled"sv; + case ComputeTaskOutcome::NoResult: + return "NoResult"sv; + case ComputeTaskOutcome::Exipred: + return "Exipred"sv; + case ComputeTaskOutcome::BlobNotFound: + return "BlobNotFound"sv; + case ComputeTaskOutcome::Exception: + return "Exception"sv; + }; + return "Unknown"sv; + } + + virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.empty()) + { + if (m_CompletedTasks.empty()) + { + // Nothing to do. + return {.Success = true}; + } + + UpstreamApplyCompleted CompletedTasks; + std::swap(CompletedTasks, m_CompletedTasks); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } + } + + try + { + CloudCacheSession ComputeSession(m_Client); + + CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); + Log().debug("Get compute updates Bytes={} Duration={}s Result={}", + UpdatesResult.Bytes, + UpdatesResult.ElapsedSeconds, + UpdatesResult.Success); + Bytes += UpdatesResult.Bytes; + ElapsedSeconds += UpdatesResult.ElapsedSeconds; + if (!UpdatesResult.Success) + { + return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!UpdatesResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + + CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response)); + + for (auto& It : TaskStatus["u"sv]) + { + CbObjectView Status = It.AsObjectView(); + IoHash TaskId = Status["h"sv].AsHash(); + const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); + const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32(); + + Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State)); + + // Only completed tasks need to be processed + if (State != ComputeTaskState::Complete) + { + continue; + } + + IoHash WorkerId{}; + IoHash ActionId{}; + UpstreamApplyType ApplyType{}; + + { + std::scoped_lock Lock(m_TaskMutex); + auto TaskIt = m_PendingTasks.find(TaskId); + if (TaskIt != m_PendingTasks.end()) + { + WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); + ActionId = TaskIt->second.Action.GetHash(); + ApplyType = TaskIt->second.Type; + m_PendingTasks.erase(TaskIt); + } + } + + if (WorkerId == IoHash::Zero) + { + Log().warn("Task {} missing from pending tasks", TaskId); + continue; + } + + std::map<std::string, uint64_t> Timepoints; + ProcessQueueTimings(Status["qs"sv].AsObjectView(), Timepoints); + ProcessExecuteTimings(Status["es"sv].AsObjectView(), Timepoints); + + if (Outcome != ComputeTaskOutcome::Success) + { + const std::string_view Detail = Status["d"sv].AsString(); + { + std::scoped_lock Lock(m_TaskMutex); + m_CompletedTasks[WorkerId][ActionId] = { + .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}, + .Timepoints = std::move(Timepoints)}; + } + continue; + } + + Timepoints["zen-complete-queue-added"] = DateTime::NowTicks(); + ThreadPool.ScheduleWork([this, + ApplyType, + ResultHash = Status["r"sv].AsHash(), + Timepoints = std::move(Timepoints), + TaskId = std::move(TaskId), + WorkerId = std::move(WorkerId), + ActionId = std::move(ActionId)]() mutable { + Timepoints["zen-complete-queue-dispatched"] = DateTime::NowTicks(); + GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash); + Timepoints["zen-complete-queue-complete"] = DateTime::NowTicks(); + Result.Timepoints.merge(Timepoints); + + Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}", + TaskId, + Result.OutputFiles.size(), + Result.OutputPackage.GetAttachments().size(), + Result.Error.ErrorCode); + { + std::scoped_lock Lock(m_TaskMutex); + m_CompletedTasks[WorkerId][ActionId] = std::move(Result); + } + }); + } + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_CompletedTasks.empty()) + { + // Nothing to do. + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + UpstreamApplyCompleted CompletedTasks; + std::swap(CompletedTasks, m_CompletedTasks); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } + } + catch (std::exception& Err) + { + m_HealthOk = false; + return { + .Error{.ErrorCode = -1, .Reason = Err.what()}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + }; + } + } + + virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } + + private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + CidStore& m_CidStore; + AuthMgr& m_AuthMgr; + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + RefPtr<CloudCacheClient> m_StorageClient; + UpstreamApplyEndpointStats m_Stats; + std::atomic_bool m_HealthOk{false}; + std::string m_ChannelId; + + std::mutex m_TaskMutex; + std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; + UpstreamApplyCompleted m_CompletedTasks; + + struct UpstreamData + { + std::map<IoHash, IoBuffer> Blobs; + std::map<IoHash, CbObject> Objects; + std::set<IoHash> CasIds; + std::set<IoHash> Cids; + IoHash TaskId; + IoHash RequirementsId; + }; + + struct UpstreamDirectory + { + std::filesystem::path Path; + std::map<std::string, UpstreamDirectory> Directories; + std::set<std::string> Files; + }; + + static void ProcessQueueTimings(CbObjectView QueueStats, std::map<std::string, uint64_t>& Timepoints) + { + uint64_t Ticks = QueueStats["t"sv].AsDateTimeTicks(); + if (Ticks == 0) + { + return; + } + + // Scope is an array of miliseconds after start time + // TODO: cleanup + Timepoints["horde-queue-added"] = Ticks; + int Index = 0; + for (auto& Item : QueueStats["s"sv].AsArrayView()) + { + Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; + switch (Index) + { + case 0: + Timepoints["horde-queue-dispatched"] = Ticks; + break; + case 1: + Timepoints["horde-queue-complete"] = Ticks; + break; + } + Index++; + } + } + + static void ProcessExecuteTimings(CbObjectView ExecutionStats, std::map<std::string, uint64_t>& Timepoints) + { + uint64_t Ticks = ExecutionStats["t"sv].AsDateTimeTicks(); + if (Ticks == 0) + { + return; + } + + // Scope is an array of miliseconds after start time + // TODO: cleanup + Timepoints["horde-execution-start"] = Ticks; + int Index = 0; + for (auto& Item : ExecutionStats["s"sv].AsArrayView()) + { + Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; + switch (Index) + { + case 0: + Timepoints["horde-execution-download-ref"] = Ticks; + break; + case 1: + Timepoints["horde-execution-download-input"] = Ticks; + break; + case 2: + Timepoints["horde-execution-execute"] = Ticks; + break; + case 3: + Timepoints["horde-execution-upload-log"] = Ticks; + break; + case 4: + Timepoints["horde-execution-upload-output"] = Ticks; + break; + case 5: + Timepoints["horde-execution-upload-ref"] = Ticks; + break; + } + Index++; + } + } + + [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash) + { + try + { + CloudCacheSession Session(m_StorageClient); + + GetUpstreamApplyResult ApplyResult{}; + + IoHash StdOutHash; + IoHash StdErrHash; + IoHash OutputHash; + + std::map<IoHash, IoBuffer> BinaryData; + + { + CloudCacheResult ObjectRefResult = + Session.GetRef(Session.Client().DefaultBlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject); + Log().debug("Get ref {} Bytes={} Duration={}s Result={}", + ResultHash, + ObjectRefResult.Bytes, + ObjectRefResult.ElapsedSeconds, + ObjectRefResult.Success); + ApplyResult.Bytes += ObjectRefResult.Bytes; + ApplyResult.ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-get-ref"] = DateTime::NowTicks(); + + if (!ObjectRefResult.Success) + { + ApplyResult.Error.Reason = "Failed to get result object data"; + return ApplyResult; + } + + CbObject ResultObject = LoadCompactBinaryObject(ObjectRefResult.Response); + ApplyResult.Error.ErrorCode = ResultObject["e"sv].AsInt32(); + StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); + StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); + OutputHash = ResultObject["o"sv].AsObjectAttachment(); + } + + { + std::set<IoHash> NeededData; + if (OutputHash != IoHash::Zero) + { + GetObjectReferencesResult ObjectReferenceResult = + Session.GetObjectReferences(Session.Client().DefaultBlobStoreNamespace(), OutputHash); + Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}", + ResultHash, + ObjectReferenceResult.References.size(), + ObjectReferenceResult.Bytes, + ObjectReferenceResult.ElapsedSeconds, + ObjectReferenceResult.Success); + ApplyResult.Bytes += ObjectReferenceResult.Bytes; + ApplyResult.ElapsedSeconds += ObjectReferenceResult.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-get-object-references"] = DateTime::NowTicks(); + + if (!ObjectReferenceResult.Success) + { + ApplyResult.Error.Reason = "Failed to get result object references"; + return ApplyResult; + } + + NeededData = std::move(ObjectReferenceResult.References); + } + + NeededData.insert(OutputHash); + NeededData.insert(StdOutHash); + NeededData.insert(StdErrHash); + + for (const auto& Hash : NeededData) + { + if (Hash == IoHash::Zero) + { + continue; + } + CloudCacheResult BlobResult = Session.GetBlob(Session.Client().DefaultBlobStoreNamespace(), Hash); + Log().debug("Get blob {} Bytes={} Duration={}s Result={}", + Hash, + BlobResult.Bytes, + BlobResult.ElapsedSeconds, + BlobResult.Success); + ApplyResult.Bytes += BlobResult.Bytes; + ApplyResult.ElapsedSeconds += BlobResult.ElapsedSeconds; + if (!BlobResult.Success) + { + ApplyResult.Error.Reason = "Failed to get blob"; + return ApplyResult; + } + BinaryData[Hash] = std::move(BlobResult.Response); + } + ApplyResult.Timepoints["zen-storage-get-blobs"] = DateTime::NowTicks(); + } + + ApplyResult.StdOut = StdOutHash != IoHash::Zero + ? std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()) + : ""; + ApplyResult.StdErr = StdErrHash != IoHash::Zero + ? std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()) + : ""; + + if (OutputHash == IoHash::Zero) + { + ApplyResult.Error.Reason = "Task completed with no output object"; + return ApplyResult; + } + + CbObject OutputObject = LoadCompactBinaryObject(BinaryData[OutputHash]); + + switch (ApplyType) + { + case UpstreamApplyType::Simple: + { + ResolveMerkleTreeDirectory(""sv, OutputHash, BinaryData, ApplyResult.OutputFiles); + for (const auto& Pair : BinaryData) + { + ApplyResult.FileData[Pair.first] = std::move(BinaryData.at(Pair.first)); + } + + ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; + return ApplyResult; + } + break; + case UpstreamApplyType::Asset: + { + if (ApplyResult.Error.ErrorCode != 0) + { + ApplyResult.Error.Reason = "Task completed with errors"; + return ApplyResult; + } + + // Get build.output + IoHash BuildOutputId; + IoBuffer BuildOutput; + for (auto& It : OutputObject["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + if (FileObject["n"sv].AsString() == "Build.output"sv) + { + BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); + BuildOutput = BinaryData[BuildOutputId]; + break; + } + } + + if (BuildOutput.GetSize() == 0) + { + ApplyResult.Error.Reason = "Build.output file not found in task results"; + return ApplyResult; + } + + // Get Output directory node + IoBuffer OutputDirectoryTree; + for (auto& It : OutputObject["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + if (DirectoryObject["n"sv].AsString() == "Outputs"sv) + { + OutputDirectoryTree = BinaryData[DirectoryObject["h"sv].AsObjectAttachment()]; + break; + } + } + + if (OutputDirectoryTree.GetSize() == 0) + { + ApplyResult.Error.Reason = "Outputs directory not found in task results"; + return ApplyResult; + } + + // load build.output as CbObject + + // Move Outputs from Horde to CbPackage + + std::unordered_map<IoHash, IoHash> CidToCompressedId; + CbPackage OutputPackage; + CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); + + for (auto& It : OutputDirectoryTreeObject["f"sv]) + { + CbObjectView FileObject = It.AsObjectView(); + // Name is the uncompressed hash + IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); + // Hash is the compressed data hash, and how it is stored in Horde + IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); + + if (!BinaryData.contains(CompressedId)) + { + Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId); + ApplyResult.Error.Reason = "Object attachment chunk not retrieved from Horde"; + return ApplyResult; + } + CidToCompressedId[DecompressedId] = CompressedId; + } + + // Iterate attachments, verify all chunks exist, and add to CbPackage + bool AnyErrors = false; + CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); + BuildOutputObject.IterateAttachments([&](CbFieldView Field) { + const IoHash DecompressedId = Field.AsHash(); + if (!CidToCompressedId.contains(DecompressedId)) + { + Log().warn("Attachment not found {}", DecompressedId); + AnyErrors = true; + return; + } + const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); + + if (!BinaryData.contains(CompressedId)) + { + Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId); + AnyErrors = true; + return; + } + + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer AttachmentBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]), RawHash, RawSize); + + if (!AttachmentBuffer || RawHash != DecompressedId) + { + Log().warn( + "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", + CompressedId, + DecompressedId); + AnyErrors = true; + return; + } + + ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + ApplyResult.TotalRawAttachmentBytes += RawSize; + + CbAttachment Attachment(AttachmentBuffer, DecompressedId); + OutputPackage.AddAttachment(Attachment); + }); + + if (AnyErrors) + { + ApplyResult.Error.Reason = "Failed to get result object attachment data"; + return ApplyResult; + } + + OutputPackage.SetObject(BuildOutputObject); + ApplyResult.OutputPackage = std::move(OutputPackage); + + ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; + return ApplyResult; + } + break; + } + + ApplyResult.Error.Reason = "Unknown apply type"; + return ApplyResult; + } + catch (std::exception& Err) + { + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + } + } + + [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) + { + std::string ExecutablePath; + std::string WorkingDirectory; + std::vector<std::string> Arguments; + std::map<std::string, std::string> Environment; + std::set<std::filesystem::path> InputFiles; + std::set<std::string> Outputs; + std::map<std::filesystem::path, IoHash> InputFileHashes; + + ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); + if (ExecutablePath.empty()) + { + Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", + ApplyRecord.WorkerDescriptor.GetHash()); + return false; + } + + WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString(); + + for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) + { + std::string_view Directory = It.AsString(); + std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory); + InputFiles.insert(DummyFile); + Data.Blobs[EmptyBufferId] = EmptyBuffer; + InputFileHashes[DummyFile] = EmptyBufferId; + } + + if (!WorkingDirectory.empty()) + { + std::string DummyFile = fmt::format("{}/.zen_empty_file", WorkingDirectory); + InputFiles.insert(DummyFile); + Data.Blobs[EmptyBufferId] = EmptyBuffer; + InputFileHashes[DummyFile] = EmptyBufferId; + } + + for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) + { + std::string_view Env = It.AsString(); + auto Index = Env.find('='); + if (Index == std::string_view::npos) + { + Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); + return false; + } + + Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); + } + + switch (ApplyRecord.Type) + { + case UpstreamApplyType::Simple: + { + for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv]) + { + Arguments.push_back(std::string(It.AsString())); + } + + for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv]) + { + Outputs.insert(std::string(It.AsString())); + } + } + break; + case UpstreamApplyType::Asset: + { + static const std::filesystem::path BuildActionPath = "Build.action"sv; + static const std::filesystem::path InputPath = "Inputs"sv; + const IoHash ActionId = ApplyRecord.Action.GetHash(); + + Arguments.push_back("-Build=build.action"); + Outputs.insert("Build.output"); + Outputs.insert("Outputs"); + + InputFiles.insert(BuildActionPath); + InputFileHashes[BuildActionPath] = ActionId; + Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), + ApplyRecord.Action.GetBuffer().GetSize()); + + bool AnyErrors = false; + ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; + + if (!m_CidStore.ContainsChunk(Cid)) + { + Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); + AnyErrors = true; + return; + } + + if (InputFiles.contains(FilePath)) + { + return; + } + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = Cid; + Data.Cids.insert(Cid); + }); + + if (AnyErrors) + { + return false; + } + } + break; + } + + const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); + + CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Cids, Data.Objects); + const IoHash SandboxHash = Sandbox.GetHash(); + Data.Objects[SandboxHash] = std::move(Sandbox); + + { + std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); + if (HostPlatform.empty()) + { + Log().warn("process apply upstream FAILED, 'host' platform not provided"); + return false; + } + + int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32(); + int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); + bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); + + std::string Condition = fmt::format("Platform == '{}'", HostPlatform); + if (HostPlatform == "Win64") + { + // TODO + // Condition += " && Pool == 'Win-RemoteExec'"; + } + + std::map<std::string_view, int64_t> Resources; + if (LogicalCores > 0) + { + Resources["LogicalCores"sv] = LogicalCores; + } + if (Memory > 0) + { + Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL); + } + + CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; + } + + CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs); + + const IoHash TaskId = Task.GetHash(); + Data.Objects[TaskId] = std::move(Task); + Data.TaskId = TaskId; + + return true; + } + + [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, + std::set<std::filesystem::path>& InputFiles, + std::map<std::filesystem::path, IoHash>& InputFileHashes, + std::set<IoHash>& CasIds) + { + const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); + const IoHash ChunkId = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + + if (!m_CidStore.ContainsChunk(ChunkId)) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); + return false; + } + + if (InputFiles.contains(FilePath)) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); + return false; + } + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = ChunkId; + CasIds.insert(ChunkId); + return true; + } + + [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles) + { + static const std::filesystem::path RootPath; + std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories; + UpstreamDirectory RootDirectory = {.Path = RootPath}; + + AllDirectories[RootPath] = &RootDirectory; + + // Build tree from flat list + for (const auto& Path : InputFiles) + { + if (Path.has_parent_path()) + { + if (!AllDirectories.contains(Path.parent_path())) + { + std::stack<std::string> PathSplit; + { + std::filesystem::path ParentPath = Path.parent_path(); + PathSplit.push(ParentPath.filename().string()); + while (ParentPath.has_parent_path()) + { + ParentPath = ParentPath.parent_path(); + PathSplit.push(ParentPath.filename().string()); + } + } + UpstreamDirectory* ParentPtr = &RootDirectory; + while (!PathSplit.empty()) + { + if (!ParentPtr->Directories.contains(PathSplit.top())) + { + std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; + ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; + AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; + } + ParentPtr = &ParentPtr->Directories[PathSplit.top()]; + PathSplit.pop(); + } + } + + AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); + } + else + { + RootDirectory.Files.insert(Path.filename().string()); + } + } + + return RootDirectory; + } + + [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, + const std::map<std::filesystem::path, IoHash>& InputFileHashes, + const std::set<IoHash>& Cids, + std::map<IoHash, CbObject>& Objects) + { + CbObjectWriter DirectoryTreeWriter; + + if (!RootDirectory.Files.empty()) + { + DirectoryTreeWriter.BeginArray("f"sv); + for (const auto& File : RootDirectory.Files) + { + const std::filesystem::path FilePath = {RootDirectory.Path / File}; + const IoHash& FileHash = InputFileHashes.at(FilePath); + const bool Compressed = Cids.contains(FileHash); + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, File); + DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); + DirectoryTreeWriter.AddBool("c"sv, Compressed); + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + if (!RootDirectory.Directories.empty()) + { + DirectoryTreeWriter.BeginArray("d"sv); + for (const auto& Item : RootDirectory.Directories) + { + CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Cids, Objects); + const IoHash DirectoryHash = Directory.GetHash(); + Objects[DirectoryHash] = std::move(Directory); + + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, Item.first); + DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + return DirectoryTreeWriter.Save(); + } + + void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory, + const IoHash& DirectoryHash, + const std::map<IoHash, IoBuffer>& Objects, + std::map<std::filesystem::path, IoHash>& OutputFiles) + { + CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash)); + + for (auto& It : Directory["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString(); + + OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment(); + } + + for (auto& It : Directory["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + + ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(), + DirectoryObject["h"sv].AsObjectAttachment(), + Objects, + OutputFiles); + } + } + + [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, + const std::map<std::string_view, int64_t>& Resources, + const bool Exclusive) + { + CbObjectWriter Writer; + Writer.AddString("c", Condition); + if (!Resources.empty()) + { + Writer.BeginArray("r"); + for (const auto& Resource : Resources) + { + Writer.BeginArray(); + Writer.AddString(Resource.first); + Writer.AddInteger(Resource.second); + Writer.EndArray(); + } + Writer.EndArray(); + } + Writer.AddBool("e", Exclusive); + return Writer.Save(); + } + + [[nodiscard]] CbObject BuildTask(const std::string_view Executable, + const std::vector<std::string>& Arguments, + const std::map<std::string, std::string>& Environment, + const std::string_view WorkingDirectory, + const IoHash& SandboxHash, + const IoHash& RequirementsId, + const std::set<std::string>& Outputs) + { + CbObjectWriter TaskWriter; + TaskWriter.AddString("e"sv, Executable); + + if (!Arguments.empty()) + { + TaskWriter.BeginArray("a"sv); + for (const auto& Argument : Arguments) + { + TaskWriter.AddString(Argument); + } + TaskWriter.EndArray(); + } + + if (!Environment.empty()) + { + TaskWriter.BeginArray("v"sv); + for (const auto& Env : Environment) + { + TaskWriter.BeginArray(); + TaskWriter.AddString(Env.first); + TaskWriter.AddString(Env.second); + TaskWriter.EndArray(); + } + TaskWriter.EndArray(); + } + + if (!WorkingDirectory.empty()) + { + TaskWriter.AddString("w"sv, WorkingDirectory); + } + + TaskWriter.AddObjectAttachment("s"sv, SandboxHash); + TaskWriter.AddObjectAttachment("r"sv, RequirementsId); + + // Outputs + if (!Outputs.empty()) + { + TaskWriter.BeginArray("o"sv); + for (const auto& Output : Outputs) + { + TaskWriter.AddString(Output); + } + TaskWriter.EndArray(); + } + + return TaskWriter.Save(); + } + }; +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamApplyEndpoint> +UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CidStore& CidStore, + AuthMgr& Mgr) +{ + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + CidStore, + Mgr); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES |