// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include namespace zen { class AuthMgr; class CbObjectWriter; class CidStore; class CloudCacheTokenProvider; class WorkerThreadPool; class ZenCacheNamespace; struct CloudCacheClientOptions; struct UpstreamAuthConfig; enum class UpstreamApplyState : int32_t { Queued = 0, Executing = 1, Complete = 2, }; enum class UpstreamApplyType { Simple = 0, Asset = 1, }; struct UpstreamApplyRecord { CbObject WorkerDescriptor; CbObject Action; UpstreamApplyType Type; std::map Timepoints{}; }; struct UpstreamApplyOptions { std::chrono::seconds HealthCheckInterval{5}; std::chrono::seconds UpdatesInterval{5}; uint32_t UpstreamThreadCount = 4; uint32_t DownstreamThreadCount = 4; bool StatsEnabled = false; }; struct UpstreamApplyError { int32_t ErrorCode{}; std::string Reason{}; explicit operator bool() const { return ErrorCode != 0; } }; struct PostUpstreamApplyResult { UpstreamApplyError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; std::map Timepoints{}; bool Success = false; }; struct GetUpstreamApplyResult { // UpstreamApplyType::Simple std::map OutputFiles{}; std::map FileData{}; // UpstreamApplyType::Asset CbPackage OutputPackage{}; int64_t TotalAttachmentBytes{}; int64_t TotalRawAttachmentBytes{}; UpstreamApplyError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; std::string StdOut{}; std::string StdErr{}; std::string Agent{}; std::string Detail{}; std::map Timepoints{}; bool Success = false; }; using UpstreamApplyCompleted = std::unordered_map>; struct GetUpstreamApplyUpdatesResult { UpstreamApplyError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; UpstreamApplyCompleted Completed{}; bool Success = false; }; struct UpstreamApplyStatus { UpstreamApplyState State{}; GetUpstreamApplyResult Result{}; std::chrono::steady_clock::time_point ExpireTime{}; std::map Timepoints{}; }; using UpstreamApplyTasks = std::unordered_map>; struct UpstreamEndpointHealth { std::string Reason; bool Ok = false; }; struct UpstreamApplyEndpointStats { metrics::Counter PostCount; metrics::Counter CompleteCount; metrics::Counter UpdateCount; metrics::Counter ErrorCount; metrics::Counter UpBytes; metrics::Counter DownBytes; }; /** * The upstream apply endpoint is responsible for handling remote execution. */ class UpstreamApplyEndpoint { public: virtual ~UpstreamApplyEndpoint() = default; virtual UpstreamEndpointHealth Initialize() = 0; virtual bool IsHealthy() const = 0; virtual UpstreamEndpointHealth CheckHealth() = 0; virtual std::string_view DisplayName() const = 0; virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) = 0; virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) = 0; virtual UpstreamApplyEndpointStats& Stats() = 0; static std::unique_ptr CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, CidStore& CidStore, AuthMgr& Mgr); }; /** * Manages one or more upstream compute endpoints. */ class UpstreamApply { public: virtual ~UpstreamApply() = default; virtual bool Initialize() = 0; virtual bool IsHealthy() const = 0; virtual void RegisterEndpoint(std::unique_ptr Endpoint) = 0; struct EnqueueResult { IoHash ApplyId{}; bool Success = false; }; struct StatusResult { UpstreamApplyStatus Status{}; bool Success = false; }; virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; static std::unique_ptr Create(const UpstreamApplyOptions& Options, CidStore& CidStore); }; } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES