aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/upstreamapply.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/upstreamapply.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/upstream/upstreamapply.cpp')
-rw-r--r--src/zenserver/upstream/upstreamapply.cpp459
1 files changed, 459 insertions, 0 deletions
diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp
new file mode 100644
index 000000000..c719b225d
--- /dev/null
+++ b/src/zenserver/upstream/upstreamapply.cpp
@@ -0,0 +1,459 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "upstreamapply.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/fmtutils.h>
+# include <zencore/stream.h>
+# include <zencore/timer.h>
+# include <zencore/workthreadpool.h>
+
+# include <zenstore/cidstore.h>
+
+# include "diag/logging.h"
+
+# include <fmt/format.h>
+
+# include <atomic>
+
+namespace zen {
+
+using namespace std::literals;
+
+struct UpstreamApplyStats
+{
+ static constexpr uint64_t MaxSampleCount = 1000ull;
+
+ UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {}
+
+ void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result)
+ {
+ UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount.Increment(1);
+ }
+ else if (Result.Success)
+ {
+ Stats.PostCount.Increment(1);
+ Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024);
+ }
+ }
+
+ void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result)
+ {
+ UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount.Increment(1);
+ }
+ else if (Result.Success)
+ {
+ Stats.UpdateCount.Increment(1);
+ Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024);
+ if (!Result.Completed.empty())
+ {
+ uint64_t Completed = 0;
+ for (auto& It : Result.Completed)
+ {
+ Completed += It.second.size();
+ }
+ Stats.CompleteCount.Increment(Completed);
+ }
+ }
+ }
+
+ bool m_Enabled;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+class UpstreamApplyImpl final : public UpstreamApply
+{
+public:
+ UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore)
+ : m_Log(logging::Get("upstream-apply"))
+ , m_Options(Options)
+ , m_CidStore(CidStore)
+ , m_Stats(Options.StatsEnabled)
+ , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount)
+ , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount)
+ {
+ }
+
+ virtual ~UpstreamApplyImpl() { Shutdown(); }
+
+ virtual bool Initialize() override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ const UpstreamEndpointHealth Health = Endpoint->Initialize();
+ if (Health.Ok)
+ {
+ Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ }
+ else
+ {
+ Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+
+ m_RunState.IsRunning = !m_Endpoints.empty();
+
+ if (m_RunState.IsRunning)
+ {
+ m_ShutdownEvent.Reset();
+
+ m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this);
+
+ m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this);
+ }
+
+ return m_RunState.IsRunning;
+ }
+
+ virtual bool IsHealthy() const override
+ {
+ if (m_RunState.IsRunning)
+ {
+ for (const auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override
+ {
+ m_Endpoints.emplace_back(std::move(Endpoint));
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override
+ {
+ if (m_RunState.IsRunning)
+ {
+ const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+ const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300);
+
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ // Already in progress
+ return {.ApplyId = ActionId, .Success = true};
+ }
+
+ std::chrono::steady_clock::time_point ExpireTime =
+ TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds)
+ : std::chrono::steady_clock::time_point::max();
+
+ m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)};
+ }
+
+ ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks();
+ m_UpstreamAsyncWorkPool.ScheduleWork(
+ [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
+
+ return {.ApplyId = ActionId, .Success = true};
+ }
+
+ return {};
+ }
+
+ virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override
+ {
+ if (m_RunState.IsRunning)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ return {.Status = *Status, .Success = true};
+ }
+ }
+
+ return {};
+ }
+
+ virtual void GetStatus(CbObjectWriter& Status) override
+ {
+ Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount;
+ Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork();
+ Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount;
+ Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork();
+
+ Status.BeginArray("endpoints");
+ for (const auto& Ep : m_Endpoints)
+ {
+ Status.BeginObject();
+ Status << "name" << Ep->DisplayName();
+ Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+
+ UpstreamApplyEndpointStats& Stats = Ep->Stats();
+ const uint64_t PostCount = Stats.PostCount.Value();
+ const uint64_t CompleteCount = Stats.CompleteCount.Value();
+ // const uint64_t UpdateCount = Stats.UpdateCount;
+ const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
+
+ Status << "post_count" << PostCount;
+ Status << "complete_count" << PostCount;
+ Status << "update_count" << Stats.UpdateCount.Value();
+
+ Status << "complete_ratio" << CompleteRate;
+ Status << "downloaded_mb" << Stats.DownBytes.Value();
+ Status << "uploaded_mb" << Stats.UpBytes.Value();
+ Status << "error_count" << Stats.ErrorCount.Value();
+
+ Status.EndObject();
+ }
+ Status.EndArray();
+ }
+
+private:
+ // The caller is responsible for locking if required
+ UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId)
+ {
+ if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end())
+ {
+ if (auto It2 = It->second.find(ActionId); It2 != It->second.end())
+ {
+ return &It2->second;
+ }
+ }
+ return nullptr;
+ }
+
+ void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord)
+ {
+ const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+ try
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks();
+ PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord));
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ Status->Timepoints.merge(Result.Timepoints);
+
+ if (Result.Success)
+ {
+ Status->State = UpstreamApplyState::Executing;
+ }
+ else
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error = std::move(Result.Error),
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds};
+ }
+ }
+ }
+ m_Stats.Add(*Endpoint, Result);
+ return;
+ }
+ }
+
+ Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId);
+
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}};
+ }
+ }
+ }
+ catch (std::exception& e)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}};
+ }
+ Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what());
+ }
+ }
+
+ void ProcessApplyUpdates()
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool);
+ m_Stats.Add(*Endpoint, Result);
+
+ if (!Result.Success)
+ {
+ Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason);
+ }
+
+ if (!Result.Completed.empty())
+ {
+ for (auto& It : Result.Completed)
+ {
+ for (auto& It2 : It.second)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(It.first, It2.first); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = std::move(It2.second);
+ Status->Result.Timepoints.merge(Status->Timepoints);
+ Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks();
+ Status->Timepoints.clear();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ void ProcessUpstreamUpdates()
+ {
+ const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval);
+ while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count())))
+ {
+ if (!m_RunState.IsRunning)
+ {
+ break;
+ }
+
+ ProcessApplyUpdates();
+
+ // Remove any expired tasks, regardless of state
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ for (auto& WorkerIt : m_ApplyTasks)
+ {
+ const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) {
+ return Item.second.ExpireTime < std::chrono::steady_clock::now();
+ });
+ if (Count > 0)
+ {
+ Log().debug("Removed '{}' expired tasks", Count);
+ }
+ }
+ const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); });
+ if (Count > 0)
+ {
+ Log().debug("Removed '{}' empty task lists", Count);
+ }
+ }
+ }
+ }
+
+ void MonitorEndpoints()
+ {
+ for (;;)
+ {
+ {
+ std::unique_lock Lock(m_RunState.Mutex);
+ if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); }))
+ {
+ break;
+ }
+ }
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (!Endpoint->IsHealthy())
+ {
+ if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ {
+ Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
+ }
+ else
+ {
+ Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+ }
+ }
+ }
+
+ void Shutdown()
+ {
+ if (m_RunState.Stop())
+ {
+ m_ShutdownEvent.Set();
+ m_EndpointMonitorThread.join();
+ m_UpstreamUpdatesThread.join();
+ m_Endpoints.clear();
+ }
+ }
+
+ spdlog::logger& Log() { return m_Log; }
+
+ struct RunState
+ {
+ std::mutex Mutex;
+ std::condition_variable ExitSignal;
+ std::atomic_bool IsRunning{false};
+
+ bool Stop()
+ {
+ bool Stopped = false;
+ {
+ std::scoped_lock Lock(Mutex);
+ Stopped = IsRunning.exchange(false);
+ }
+ if (Stopped)
+ {
+ ExitSignal.notify_all();
+ }
+ return Stopped;
+ }
+ };
+
+ spdlog::logger& m_Log;
+ UpstreamApplyOptions m_Options;
+ CidStore& m_CidStore;
+ UpstreamApplyStats m_Stats;
+ UpstreamApplyTasks m_ApplyTasks;
+ std::mutex m_ApplyTasksMutex;
+ std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
+ Event m_ShutdownEvent;
+ WorkerThreadPool m_UpstreamAsyncWorkPool;
+ WorkerThreadPool m_DownstreamAsyncWorkPool;
+ std::thread m_UpstreamUpdatesThread;
+ std::thread m_EndpointMonitorThread;
+ RunState m_RunState;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+bool
+UpstreamApply::IsHealthy() const
+{
+ return false;
+}
+
+std::unique_ptr<UpstreamApply>
+UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore)
+{
+ return std::make_unique<UpstreamApplyImpl>(Options, CidStore);
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES