aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamapply.h
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/upstreamapply.h')
-rw-r--r--zenserver/upstream/upstreamapply.h172
1 files changed, 172 insertions, 0 deletions
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
new file mode 100644
index 000000000..8f72660c7
--- /dev/null
+++ b/zenserver/upstream/upstreamapply.h
@@ -0,0 +1,172 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinarypackage.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/zencore.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace zen {
+
+class CbObjectWriter;
+class CasStore;
+class CidStore;
+class ZenCacheStore;
+struct CloudCacheClientOptions;
+
+enum class UpstreamApplyState : int32_t
+{
+ Queued = 0,
+ Executing = 1,
+ Complete = 2,
+};
+
+struct UpstreamApplyRecord
+{
+ CbObject WorkerDescriptor;
+ CbObject Action;
+};
+
+struct UpstreamApplyOptions
+{
+ std::chrono::seconds HealthCheckInterval{5};
+ std::chrono::seconds UpdatesInterval{5};
+ uint32_t ThreadCount = 4;
+ bool StatsEnabled = false;
+};
+
+struct UpstreamApplyError
+{
+ int32_t ErrorCode{};
+ std::string Reason{};
+
+ explicit operator bool() const { return ErrorCode != 0; }
+};
+
+struct PostUpstreamApplyResult
+{
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct GetUpstreamApplyResult
+{
+ CbPackage OutputPackage{};
+ int64_t TotalAttachmentBytes{};
+ int64_t TotalRawAttachmentBytes{};
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ std::string StdOut{};
+ std::string StdErr{};
+ bool Success = false;
+};
+
+using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>;
+
+struct GetUpstreamApplyUpdatesResult
+{
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ UpstreamApplyCompleted Completed{};
+ bool Success = false;
+};
+
+struct UpstreamApplyStatus
+{
+ UpstreamApplyState State{};
+ GetUpstreamApplyResult Result{};
+ std::chrono::steady_clock::time_point ExpireTime{};
+};
+
+using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>;
+
+struct UpstreamEndpointHealth
+{
+ std::string Reason;
+ bool Ok = false;
+};
+
+struct UpstreamApplyEndpointStats
+{
+ std::atomic_uint64_t PostCount{};
+ std::atomic_uint64_t CompleteCount{};
+ std::atomic_uint64_t UpdateCount{};
+ std::atomic_uint64_t ErrorCount{};
+ std::atomic<double> UpBytes{};
+ std::atomic<double> DownBytes{};
+ std::atomic<double> SecondsUp{};
+ std::atomic<double> SecondsDown{};
+};
+
+/**
+ * The upstream apply endpont is responsible for handling remote execution.
+ */
+class UpstreamApplyEndpoint
+{
+public:
+ virtual ~UpstreamApplyEndpoint() = default;
+
+ virtual UpstreamEndpointHealth Initialize() = 0;
+
+ virtual bool IsHealthy() const = 0;
+
+ virtual UpstreamEndpointHealth CheckHealth() = 0;
+
+ virtual std::string_view DisplayName() const = 0;
+
+ virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0;
+
+ virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0;
+
+ virtual UpstreamApplyEndpointStats& Stats() = 0;
+};
+
+/**
+ * Manages one or more upstream cache endpoints.
+ */
+class UpstreamApply
+{
+public:
+ virtual ~UpstreamApply() = default;
+
+ virtual bool Initialize() = 0;
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0;
+
+ struct EnqueueResult
+ {
+ IoHash ApplyId{};
+ bool Success = false;
+ };
+
+ struct StatusResult
+ {
+ UpstreamApplyStatus Status{};
+ bool Success = false;
+ };
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0;
+
+ virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
+
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
+};
+
+std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
+
+std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options,
+ CasStore& CasStore,
+ CidStore& CidStore);
+
+} // namespace zen