diff options
Diffstat (limited to 'zenserver/upstream/upstreamapply.h')
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h new file mode 100644 index 000000000..8f72660c7 --- /dev/null +++ b/zenserver/upstream/upstreamapply.h @@ -0,0 +1,172 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinarypackage.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/zencore.h> + +#include <atomic> +#include <chrono> +#include <memory> +#include <unordered_map> +#include <unordered_set> + +namespace zen { + +class CbObjectWriter; +class CasStore; +class CidStore; +class ZenCacheStore; +struct CloudCacheClientOptions; + +enum class UpstreamApplyState : int32_t +{ + Queued = 0, + Executing = 1, + Complete = 2, +}; + +struct UpstreamApplyRecord +{ + CbObject WorkerDescriptor; + CbObject Action; +}; + +struct UpstreamApplyOptions +{ + std::chrono::seconds HealthCheckInterval{5}; + std::chrono::seconds UpdatesInterval{5}; + uint32_t ThreadCount = 4; + bool StatsEnabled = false; +}; + +struct UpstreamApplyError +{ + int32_t ErrorCode{}; + std::string Reason{}; + + explicit operator bool() const { return ErrorCode != 0; } +}; + +struct PostUpstreamApplyResult +{ + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; +}; + +struct GetUpstreamApplyResult +{ + CbPackage OutputPackage{}; + int64_t TotalAttachmentBytes{}; + int64_t TotalRawAttachmentBytes{}; + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + std::string StdOut{}; + std::string StdErr{}; + bool Success = false; +}; + +using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>; + +struct GetUpstreamApplyUpdatesResult +{ + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + UpstreamApplyCompleted Completed{}; + bool Success = false; +}; + +struct UpstreamApplyStatus +{ + UpstreamApplyState State{}; + GetUpstreamApplyResult Result{}; + std::chrono::steady_clock::time_point ExpireTime{}; +}; + +using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>; + +struct UpstreamEndpointHealth +{ + std::string Reason; + bool Ok = false; +}; + +struct UpstreamApplyEndpointStats +{ + std::atomic_uint64_t PostCount{}; + std::atomic_uint64_t CompleteCount{}; + std::atomic_uint64_t UpdateCount{}; + std::atomic_uint64_t ErrorCount{}; + std::atomic<double> UpBytes{}; + std::atomic<double> DownBytes{}; + std::atomic<double> SecondsUp{}; + std::atomic<double> SecondsDown{}; +}; + +/** + * The upstream apply endpont is responsible for handling remote execution. + */ +class UpstreamApplyEndpoint +{ +public: + virtual ~UpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() = 0; + + virtual bool IsHealthy() const = 0; + + virtual UpstreamEndpointHealth CheckHealth() = 0; + + virtual std::string_view DisplayName() const = 0; + + virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0; + + virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0; + + virtual UpstreamApplyEndpointStats& Stats() = 0; +}; + +/** + * Manages one or more upstream cache endpoints. + */ +class UpstreamApply +{ +public: + virtual ~UpstreamApply() = default; + + virtual bool Initialize() = 0; + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0; + + struct EnqueueResult + { + IoHash ApplyId{}; + bool Success = false; + }; + + struct StatusResult + { + UpstreamApplyStatus Status{}; + bool Success = false; + }; + + virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; + + virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; +}; + +std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); + +std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, + CasStore& CasStore, + CidStore& CidStore); + +} // namespace zen |