// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "compute/apply.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include namespace zen { class CbObjectWriter; class CasStore; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; class CloudCacheTokenProvider; struct UpstreamAuthConfig; class AuthMgr; enum class UpstreamApplyState : int32_t { Queued = 0, Executing = 1, Complete = 2, }; struct UpstreamApplyRecord { CbObject WorkerDescriptor; CbObject Action; }; struct UpstreamApplyOptions { std::chrono::seconds HealthCheckInterval{5}; std::chrono::seconds UpdatesInterval{5}; uint32_t ThreadCount = 4; bool StatsEnabled = false; }; struct UpstreamApplyError { int32_t ErrorCode{}; std::string Reason{}; explicit operator bool() const { return ErrorCode != 0; } }; struct PostUpstreamApplyResult { UpstreamApplyError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; bool Success = false; }; struct GetUpstreamApplyResult { CbPackage OutputPackage{}; int64_t TotalAttachmentBytes{}; int64_t TotalRawAttachmentBytes{}; UpstreamApplyError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; std::string StdOut{}; std::string StdErr{}; bool Success = false; }; using UpstreamApplyCompleted = std::unordered_map>; struct GetUpstreamApplyUpdatesResult { UpstreamApplyError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; UpstreamApplyCompleted Completed{}; bool Success = false; }; struct UpstreamApplyStatus { UpstreamApplyState State{}; GetUpstreamApplyResult Result{}; std::chrono::steady_clock::time_point ExpireTime{}; }; using UpstreamApplyTasks = std::unordered_map>; struct UpstreamEndpointHealth { std::string Reason; bool Ok = false; }; struct UpstreamApplyEndpointStats { std::atomic_uint64_t PostCount{}; std::atomic_uint64_t CompleteCount{}; std::atomic_uint64_t UpdateCount{}; std::atomic_uint64_t ErrorCount{}; std::atomic UpBytes{}; std::atomic DownBytes{}; std::atomic SecondsUp{}; std::atomic SecondsDown{}; }; /** * The upstream apply endpoint is responsible for handling remote execution. */ class UpstreamApplyEndpoint { public: virtual ~UpstreamApplyEndpoint() = default; virtual UpstreamEndpointHealth Initialize() = 0; virtual bool IsHealthy() const = 0; virtual UpstreamEndpointHealth CheckHealth() = 0; virtual std::string_view DisplayName() const = 0; virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0; virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0; virtual UpstreamApplyEndpointStats& Stats() = 0; static std::unique_ptr CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr); }; /** * Manages one or more upstream compute endpoints. */ class UpstreamApply { public: virtual ~UpstreamApply() = default; virtual bool Initialize() = 0; virtual void RegisterEndpoint(std::unique_ptr Endpoint) = 0; struct EnqueueResult { IoHash ApplyId{}; bool Success = false; }; struct StatusResult { UpstreamApplyStatus Status{}; bool Success = false; }; virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; static std::unique_ptr Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); }; } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES