aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/upstreamapply.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-10-11 10:03:54 +0200
committerGitHub <[email protected]>2023-10-11 10:03:54 +0200
commit1ad940fafb5e3eae7b308dd290b6de6ade69a3eb (patch)
tree1d1efe188f45bc422292e75c6784929765882771 /src/zenserver/upstream/upstreamapply.cpp
parentfix clang-format whoopsie (diff)
downloadzen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.tar.xz
zen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.zip
remove legacy compute interfaces (#461)
* removed legacy compute code, which will be replaced with a new implementation in the future * also updated references to Jupiter storage
Diffstat (limited to 'src/zenserver/upstream/upstreamapply.cpp')
-rw-r--r--src/zenserver/upstream/upstreamapply.cpp459
1 files changed, 0 insertions, 459 deletions
diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp
deleted file mode 100644
index 3d29f2228..000000000
--- a/src/zenserver/upstream/upstreamapply.cpp
+++ /dev/null
@@ -1,459 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "upstreamapply.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/fmtutils.h>
-# include <zencore/stream.h>
-# include <zencore/timer.h>
-# include <zencore/workthreadpool.h>
-
-# include <zenstore/cidstore.h>
-
-# include "diag/logging.h"
-
-# include <fmt/format.h>
-
-# include <atomic>
-
-namespace zen {
-
-using namespace std::literals;
-
-struct UpstreamApplyStats
-{
- static constexpr uint64_t MaxSampleCount = 1000ull;
-
- UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {}
-
- void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result)
- {
- UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
-
- if (Result.Error)
- {
- Stats.ErrorCount.Increment(1);
- }
- else if (Result.Success)
- {
- Stats.PostCount.Increment(1);
- Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024);
- }
- }
-
- void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result)
- {
- UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
-
- if (Result.Error)
- {
- Stats.ErrorCount.Increment(1);
- }
- else if (Result.Success)
- {
- Stats.UpdateCount.Increment(1);
- Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024);
- if (!Result.Completed.empty())
- {
- uint64_t Completed = 0;
- for (auto& It : Result.Completed)
- {
- Completed += It.second.size();
- }
- Stats.CompleteCount.Increment(Completed);
- }
- }
- }
-
- bool m_Enabled;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-class UpstreamApplyImpl final : public UpstreamApply
-{
-public:
- UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore)
- : m_Log(logging::Get("upstream-apply"))
- , m_Options(Options)
- , m_CidStore(CidStore)
- , m_Stats(Options.StatsEnabled)
- , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount)
- , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount)
- {
- }
-
- virtual ~UpstreamApplyImpl() { Shutdown(); }
-
- virtual bool Initialize() override
- {
- for (auto& Endpoint : m_Endpoints)
- {
- const UpstreamEndpointHealth Health = Endpoint->Initialize();
- if (Health.Ok)
- {
- Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName());
- }
- else
- {
- Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
- }
- }
-
- m_RunState.IsRunning = !m_Endpoints.empty();
-
- if (m_RunState.IsRunning)
- {
- m_ShutdownEvent.Reset();
-
- m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this);
-
- m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this);
- }
-
- return m_RunState.IsRunning;
- }
-
- virtual bool IsHealthy() const override
- {
- if (m_RunState.IsRunning)
- {
- for (const auto& Endpoint : m_Endpoints)
- {
- if (Endpoint->IsHealthy())
- {
- return true;
- }
- }
- }
-
- return false;
- }
-
- virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override
- {
- m_Endpoints.emplace_back(std::move(Endpoint));
- }
-
- virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override
- {
- if (m_RunState.IsRunning)
- {
- const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
- const IoHash ActionId = ApplyRecord.Action.GetHash();
- const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300);
-
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- // Already in progress
- return {.ApplyId = ActionId, .Success = true};
- }
-
- std::chrono::steady_clock::time_point ExpireTime =
- TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds)
- : std::chrono::steady_clock::time_point::max();
-
- m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)};
- }
-
- ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks();
- m_UpstreamAsyncWorkPool.ScheduleWork(
- [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
-
- return {.ApplyId = ActionId, .Success = true};
- }
-
- return {};
- }
-
- virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override
- {
- if (m_RunState.IsRunning)
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- return {.Status = *Status, .Success = true};
- }
- }
-
- return {};
- }
-
- virtual void GetStatus(CbObjectWriter& Status) override
- {
- Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount;
- Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWorkItemCount();
- Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount;
- Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWorkItemCount();
-
- Status.BeginArray("endpoints");
- for (const auto& Ep : m_Endpoints)
- {
- Status.BeginObject();
- Status << "name" << Ep->DisplayName();
- Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
-
- UpstreamApplyEndpointStats& Stats = Ep->Stats();
- const uint64_t PostCount = Stats.PostCount.Value();
- const uint64_t CompleteCount = Stats.CompleteCount.Value();
- // const uint64_t UpdateCount = Stats.UpdateCount;
- const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
-
- Status << "post_count" << PostCount;
- Status << "complete_count" << PostCount;
- Status << "update_count" << Stats.UpdateCount.Value();
-
- Status << "complete_ratio" << CompleteRate;
- Status << "downloaded_mb" << Stats.DownBytes.Value();
- Status << "uploaded_mb" << Stats.UpBytes.Value();
- Status << "error_count" << Stats.ErrorCount.Value();
-
- Status.EndObject();
- }
- Status.EndArray();
- }
-
-private:
- // The caller is responsible for locking if required
- UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId)
- {
- if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end())
- {
- if (auto It2 = It->second.find(ActionId); It2 != It->second.end())
- {
- return &It2->second;
- }
- }
- return nullptr;
- }
-
- void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord)
- {
- const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
- const IoHash ActionId = ApplyRecord.Action.GetHash();
- try
- {
- for (auto& Endpoint : m_Endpoints)
- {
- if (Endpoint->IsHealthy())
- {
- ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks();
- PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord));
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- Status->Timepoints.merge(Result.Timepoints);
-
- if (Result.Success)
- {
- Status->State = UpstreamApplyState::Executing;
- }
- else
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = {.Error = std::move(Result.Error),
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds};
- }
- }
- }
- m_Stats.Add(*Endpoint, Result);
- return;
- }
- }
-
- Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId);
-
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}};
- }
- }
- }
- catch (std::exception& e)
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}};
- }
- Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what());
- }
- }
-
- void ProcessApplyUpdates()
- {
- for (auto& Endpoint : m_Endpoints)
- {
- if (Endpoint->IsHealthy())
- {
- GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool);
- m_Stats.Add(*Endpoint, Result);
-
- if (!Result.Success)
- {
- Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason);
- }
-
- if (!Result.Completed.empty())
- {
- for (auto& It : Result.Completed)
- {
- for (auto& It2 : It.second)
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(It.first, It2.first); Status != nullptr)
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = std::move(It2.second);
- Status->Result.Timepoints.merge(Status->Timepoints);
- Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks();
- Status->Timepoints.clear();
- }
- }
- }
- }
- }
- }
- }
-
- void ProcessUpstreamUpdates()
- {
- const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval);
- while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count())))
- {
- if (!m_RunState.IsRunning)
- {
- break;
- }
-
- ProcessApplyUpdates();
-
- // Remove any expired tasks, regardless of state
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- for (auto& WorkerIt : m_ApplyTasks)
- {
- const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) {
- return Item.second.ExpireTime < std::chrono::steady_clock::now();
- });
- if (Count > 0)
- {
- Log().debug("Removed '{}' expired tasks", Count);
- }
- }
- const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); });
- if (Count > 0)
- {
- Log().debug("Removed '{}' empty task lists", Count);
- }
- }
- }
- }
-
- void MonitorEndpoints()
- {
- for (;;)
- {
- {
- std::unique_lock Lock(m_RunState.Mutex);
- if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); }))
- {
- break;
- }
- }
-
- for (auto& Endpoint : m_Endpoints)
- {
- if (!Endpoint->IsHealthy())
- {
- if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
- {
- Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
- }
- else
- {
- Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
- }
- }
- }
- }
- }
-
- void Shutdown()
- {
- if (m_RunState.Stop())
- {
- m_ShutdownEvent.Set();
- m_EndpointMonitorThread.join();
- m_UpstreamUpdatesThread.join();
- m_Endpoints.clear();
- }
- }
-
- spdlog::logger& Log() { return m_Log; }
-
- struct RunState
- {
- std::mutex Mutex;
- std::condition_variable ExitSignal;
- std::atomic_bool IsRunning{false};
-
- bool Stop()
- {
- bool Stopped = false;
- {
- std::scoped_lock Lock(Mutex);
- Stopped = IsRunning.exchange(false);
- }
- if (Stopped)
- {
- ExitSignal.notify_all();
- }
- return Stopped;
- }
- };
-
- spdlog::logger& m_Log;
- UpstreamApplyOptions m_Options;
- CidStore& m_CidStore;
- UpstreamApplyStats m_Stats;
- UpstreamApplyTasks m_ApplyTasks;
- std::mutex m_ApplyTasksMutex;
- std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
- Event m_ShutdownEvent;
- WorkerThreadPool m_UpstreamAsyncWorkPool;
- WorkerThreadPool m_DownstreamAsyncWorkPool;
- std::thread m_UpstreamUpdatesThread;
- std::thread m_EndpointMonitorThread;
- RunState m_RunState;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-bool
-UpstreamApply::IsHealthy() const
-{
- return false;
-}
-
-std::unique_ptr<UpstreamApply>
-UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore)
-{
- return std::make_unique<UpstreamApplyImpl>(Options, CidStore);
-}
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES