diff options
| author | Stefan Boberg <[email protected]> | 2023-10-11 10:03:54 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-11 10:03:54 +0200 |
| commit | 1ad940fafb5e3eae7b308dd290b6de6ade69a3eb (patch) | |
| tree | 1d1efe188f45bc422292e75c6784929765882771 /src/zenserver/upstream/upstreamapply.cpp | |
| parent | fix clang-format whoopsie (diff) | |
| download | zen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.tar.xz zen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.zip | |
remove legacy compute interfaces (#461)
* removed legacy compute code, which will be replaced with a new implementation in the future
* also updated references to Jupiter storage
Diffstat (limited to 'src/zenserver/upstream/upstreamapply.cpp')
| -rw-r--r-- | src/zenserver/upstream/upstreamapply.cpp | 459 |
1 files changed, 0 insertions, 459 deletions
diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp deleted file mode 100644 index 3d29f2228..000000000 --- a/src/zenserver/upstream/upstreamapply.cpp +++ /dev/null @@ -1,459 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "upstreamapply.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/fmtutils.h> -# include <zencore/stream.h> -# include <zencore/timer.h> -# include <zencore/workthreadpool.h> - -# include <zenstore/cidstore.h> - -# include "diag/logging.h" - -# include <fmt/format.h> - -# include <atomic> - -namespace zen { - -using namespace std::literals; - -struct UpstreamApplyStats -{ - static constexpr uint64_t MaxSampleCount = 1000ull; - - UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} - - void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result) - { - UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); - - if (Result.Error) - { - Stats.ErrorCount.Increment(1); - } - else if (Result.Success) - { - Stats.PostCount.Increment(1); - Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024); - } - } - - void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result) - { - UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); - - if (Result.Error) - { - Stats.ErrorCount.Increment(1); - } - else if (Result.Success) - { - Stats.UpdateCount.Increment(1); - Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024); - if (!Result.Completed.empty()) - { - uint64_t Completed = 0; - for (auto& It : Result.Completed) - { - Completed += It.second.size(); - } - Stats.CompleteCount.Increment(Completed); - } - } - } - - bool m_Enabled; -}; - -////////////////////////////////////////////////////////////////////////// - -class UpstreamApplyImpl final : public UpstreamApply -{ -public: - UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore) - : m_Log(logging::Get("upstream-apply")) - , m_Options(Options) - , m_CidStore(CidStore) - , m_Stats(Options.StatsEnabled) - , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount) - , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount) - { - } - - virtual ~UpstreamApplyImpl() { Shutdown(); } - - virtual bool Initialize() override - { - for (auto& Endpoint : m_Endpoints) - { - const UpstreamEndpointHealth Health = Endpoint->Initialize(); - if (Health.Ok) - { - Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); - } - else - { - Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); - } - } - - m_RunState.IsRunning = !m_Endpoints.empty(); - - if (m_RunState.IsRunning) - { - m_ShutdownEvent.Reset(); - - m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); - - m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); - } - - return m_RunState.IsRunning; - } - - virtual bool IsHealthy() const override - { - if (m_RunState.IsRunning) - { - for (const auto& Endpoint : m_Endpoints) - { - if (Endpoint->IsHealthy()) - { - return true; - } - } - } - - return false; - } - - virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override - { - m_Endpoints.emplace_back(std::move(Endpoint)); - } - - virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override - { - if (m_RunState.IsRunning) - { - const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); - const IoHash ActionId = ApplyRecord.Action.GetHash(); - const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); - - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - // Already in progress - return {.ApplyId = ActionId, .Success = true}; - } - - std::chrono::steady_clock::time_point ExpireTime = - TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) - : std::chrono::steady_clock::time_point::max(); - - m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; - } - - ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks(); - m_UpstreamAsyncWorkPool.ScheduleWork( - [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); - - return {.ApplyId = ActionId, .Success = true}; - } - - return {}; - } - - virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override - { - if (m_RunState.IsRunning) - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - return {.Status = *Status, .Success = true}; - } - } - - return {}; - } - - virtual void GetStatus(CbObjectWriter& Status) override - { - Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; - Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWorkItemCount(); - Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; - Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWorkItemCount(); - - Status.BeginArray("endpoints"); - for (const auto& Ep : m_Endpoints) - { - Status.BeginObject(); - Status << "name" << Ep->DisplayName(); - Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); - - UpstreamApplyEndpointStats& Stats = Ep->Stats(); - const uint64_t PostCount = Stats.PostCount.Value(); - const uint64_t CompleteCount = Stats.CompleteCount.Value(); - // const uint64_t UpdateCount = Stats.UpdateCount; - const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; - - Status << "post_count" << PostCount; - Status << "complete_count" << PostCount; - Status << "update_count" << Stats.UpdateCount.Value(); - - Status << "complete_ratio" << CompleteRate; - Status << "downloaded_mb" << Stats.DownBytes.Value(); - Status << "uploaded_mb" << Stats.UpBytes.Value(); - Status << "error_count" << Stats.ErrorCount.Value(); - - Status.EndObject(); - } - Status.EndArray(); - } - -private: - // The caller is responsible for locking if required - UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) - { - if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) - { - if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) - { - return &It2->second; - } - } - return nullptr; - } - - void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) - { - const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); - const IoHash ActionId = ApplyRecord.Action.GetHash(); - try - { - for (auto& Endpoint : m_Endpoints) - { - if (Endpoint->IsHealthy()) - { - ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks(); - PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - Status->Timepoints.merge(Result.Timepoints); - - if (Result.Success) - { - Status->State = UpstreamApplyState::Executing; - } - else - { - Status->State = UpstreamApplyState::Complete; - Status->Result = {.Error = std::move(Result.Error), - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds}; - } - } - } - m_Stats.Add(*Endpoint, Result); - return; - } - } - - Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); - - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - Status->State = UpstreamApplyState::Complete; - Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; - } - } - } - catch (std::exception& e) - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - Status->State = UpstreamApplyState::Complete; - Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; - } - Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); - } - } - - void ProcessApplyUpdates() - { - for (auto& Endpoint : m_Endpoints) - { - if (Endpoint->IsHealthy()) - { - GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool); - m_Stats.Add(*Endpoint, Result); - - if (!Result.Success) - { - Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); - } - - if (!Result.Completed.empty()) - { - for (auto& It : Result.Completed) - { - for (auto& It2 : It.second) - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) - { - Status->State = UpstreamApplyState::Complete; - Status->Result = std::move(It2.second); - Status->Result.Timepoints.merge(Status->Timepoints); - Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks(); - Status->Timepoints.clear(); - } - } - } - } - } - } - } - - void ProcessUpstreamUpdates() - { - const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval); - while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count()))) - { - if (!m_RunState.IsRunning) - { - break; - } - - ProcessApplyUpdates(); - - // Remove any expired tasks, regardless of state - { - std::scoped_lock Lock(m_ApplyTasksMutex); - for (auto& WorkerIt : m_ApplyTasks) - { - const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { - return Item.second.ExpireTime < std::chrono::steady_clock::now(); - }); - if (Count > 0) - { - Log().debug("Removed '{}' expired tasks", Count); - } - } - const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); - if (Count > 0) - { - Log().debug("Removed '{}' empty task lists", Count); - } - } - } - } - - void MonitorEndpoints() - { - for (;;) - { - { - std::unique_lock Lock(m_RunState.Mutex); - if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) - { - break; - } - } - - for (auto& Endpoint : m_Endpoints) - { - if (!Endpoint->IsHealthy()) - { - if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) - { - Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); - } - else - { - Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); - } - } - } - } - } - - void Shutdown() - { - if (m_RunState.Stop()) - { - m_ShutdownEvent.Set(); - m_EndpointMonitorThread.join(); - m_UpstreamUpdatesThread.join(); - m_Endpoints.clear(); - } - } - - spdlog::logger& Log() { return m_Log; } - - struct RunState - { - std::mutex Mutex; - std::condition_variable ExitSignal; - std::atomic_bool IsRunning{false}; - - bool Stop() - { - bool Stopped = false; - { - std::scoped_lock Lock(Mutex); - Stopped = IsRunning.exchange(false); - } - if (Stopped) - { - ExitSignal.notify_all(); - } - return Stopped; - } - }; - - spdlog::logger& m_Log; - UpstreamApplyOptions m_Options; - CidStore& m_CidStore; - UpstreamApplyStats m_Stats; - UpstreamApplyTasks m_ApplyTasks; - std::mutex m_ApplyTasksMutex; - std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; - Event m_ShutdownEvent; - WorkerThreadPool m_UpstreamAsyncWorkPool; - WorkerThreadPool m_DownstreamAsyncWorkPool; - std::thread m_UpstreamUpdatesThread; - std::thread m_EndpointMonitorThread; - RunState m_RunState; -}; - -////////////////////////////////////////////////////////////////////////// - -bool -UpstreamApply::IsHealthy() const -{ - return false; -} - -std::unique_ptr<UpstreamApply> -UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore) -{ - return std::make_unique<UpstreamApplyImpl>(Options, CidStore); -} - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES |