diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/upstreamapply.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/upstream/upstreamapply.cpp')
| -rw-r--r-- | src/zenserver/upstream/upstreamapply.cpp | 459 |
1 files changed, 459 insertions, 0 deletions
diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp new file mode 100644 index 000000000..c719b225d --- /dev/null +++ b/src/zenserver/upstream/upstreamapply.cpp @@ -0,0 +1,459 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamapply.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/fmtutils.h> +# include <zencore/stream.h> +# include <zencore/timer.h> +# include <zencore/workthreadpool.h> + +# include <zenstore/cidstore.h> + +# include "diag/logging.h" + +# include <fmt/format.h> + +# include <atomic> + +namespace zen { + +using namespace std::literals; + +struct UpstreamApplyStats +{ + static constexpr uint64_t MaxSampleCount = 1000ull; + + UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} + + void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount.Increment(1); + } + else if (Result.Success) + { + Stats.PostCount.Increment(1); + Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024); + } + } + + void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount.Increment(1); + } + else if (Result.Success) + { + Stats.UpdateCount.Increment(1); + Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024); + if (!Result.Completed.empty()) + { + uint64_t Completed = 0; + for (auto& It : Result.Completed) + { + Completed += It.second.size(); + } + Stats.CompleteCount.Increment(Completed); + } + } + } + + bool m_Enabled; +}; + +////////////////////////////////////////////////////////////////////////// + +class UpstreamApplyImpl final : public UpstreamApply +{ +public: + UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore) + : m_Log(logging::Get("upstream-apply")) + , m_Options(Options) + , m_CidStore(CidStore) + , m_Stats(Options.StatsEnabled) + , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount) + , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount) + { + } + + virtual ~UpstreamApplyImpl() { Shutdown(); } + + virtual bool Initialize() override + { + for (auto& Endpoint : m_Endpoints) + { + const UpstreamEndpointHealth Health = Endpoint->Initialize(); + if (Health.Ok) + { + Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); + } + else + { + Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + + m_RunState.IsRunning = !m_Endpoints.empty(); + + if (m_RunState.IsRunning) + { + m_ShutdownEvent.Reset(); + + m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); + + m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); + } + + return m_RunState.IsRunning; + } + + virtual bool IsHealthy() const override + { + if (m_RunState.IsRunning) + { + for (const auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + return true; + } + } + } + + return false; + } + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override + { + m_Endpoints.emplace_back(std::move(Endpoint)); + } + + virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override + { + if (m_RunState.IsRunning) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + // Already in progress + return {.ApplyId = ActionId, .Success = true}; + } + + std::chrono::steady_clock::time_point ExpireTime = + TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) + : std::chrono::steady_clock::time_point::max(); + + m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; + } + + ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks(); + m_UpstreamAsyncWorkPool.ScheduleWork( + [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); + + return {.ApplyId = ActionId, .Success = true}; + } + + return {}; + } + + virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override + { + if (m_RunState.IsRunning) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + return {.Status = *Status, .Success = true}; + } + } + + return {}; + } + + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; + Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork(); + Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; + Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamApplyEndpointStats& Stats = Ep->Stats(); + const uint64_t PostCount = Stats.PostCount.Value(); + const uint64_t CompleteCount = Stats.CompleteCount.Value(); + // const uint64_t UpdateCount = Stats.UpdateCount; + const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; + + Status << "post_count" << PostCount; + Status << "complete_count" << PostCount; + Status << "update_count" << Stats.UpdateCount.Value(); + + Status << "complete_ratio" << CompleteRate; + Status << "downloaded_mb" << Stats.DownBytes.Value(); + Status << "uploaded_mb" << Stats.UpBytes.Value(); + Status << "error_count" << Stats.ErrorCount.Value(); + + Status.EndObject(); + } + Status.EndArray(); + } + +private: + // The caller is responsible for locking if required + UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) + { + if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) + { + if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) + { + return &It2->second; + } + } + return nullptr; + } + + void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + try + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks(); + PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->Timepoints.merge(Result.Timepoints); + + if (Result.Success) + { + Status->State = UpstreamApplyState::Executing; + } + else + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error = std::move(Result.Error), + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds}; + } + } + } + m_Stats.Add(*Endpoint, Result); + return; + } + } + + Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; + } + } + } + catch (std::exception& e) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; + } + Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); + } + } + + void ProcessApplyUpdates() + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool); + m_Stats.Add(*Endpoint, Result); + + if (!Result.Success) + { + Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); + } + + if (!Result.Completed.empty()) + { + for (auto& It : Result.Completed) + { + for (auto& It2 : It.second) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = std::move(It2.second); + Status->Result.Timepoints.merge(Status->Timepoints); + Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks(); + Status->Timepoints.clear(); + } + } + } + } + } + } + } + + void ProcessUpstreamUpdates() + { + const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval); + while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count()))) + { + if (!m_RunState.IsRunning) + { + break; + } + + ProcessApplyUpdates(); + + // Remove any expired tasks, regardless of state + { + std::scoped_lock Lock(m_ApplyTasksMutex); + for (auto& WorkerIt : m_ApplyTasks) + { + const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { + return Item.second.ExpireTime < std::chrono::steady_clock::now(); + }); + if (Count > 0) + { + Log().debug("Removed '{}' expired tasks", Count); + } + } + const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); + if (Count > 0) + { + Log().debug("Removed '{}' empty task lists", Count); + } + } + } + } + + void MonitorEndpoints() + { + for (;;) + { + { + std::unique_lock Lock(m_RunState.Mutex); + if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) + { + break; + } + } + + for (auto& Endpoint : m_Endpoints) + { + if (!Endpoint->IsHealthy()) + { + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + } + } + } + + void Shutdown() + { + if (m_RunState.Stop()) + { + m_ShutdownEvent.Set(); + m_EndpointMonitorThread.join(); + m_UpstreamUpdatesThread.join(); + m_Endpoints.clear(); + } + } + + spdlog::logger& Log() { return m_Log; } + + struct RunState + { + std::mutex Mutex; + std::condition_variable ExitSignal; + std::atomic_bool IsRunning{false}; + + bool Stop() + { + bool Stopped = false; + { + std::scoped_lock Lock(Mutex); + Stopped = IsRunning.exchange(false); + } + if (Stopped) + { + ExitSignal.notify_all(); + } + return Stopped; + } + }; + + spdlog::logger& m_Log; + UpstreamApplyOptions m_Options; + CidStore& m_CidStore; + UpstreamApplyStats m_Stats; + UpstreamApplyTasks m_ApplyTasks; + std::mutex m_ApplyTasksMutex; + std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; + Event m_ShutdownEvent; + WorkerThreadPool m_UpstreamAsyncWorkPool; + WorkerThreadPool m_DownstreamAsyncWorkPool; + std::thread m_UpstreamUpdatesThread; + std::thread m_EndpointMonitorThread; + RunState m_RunState; +}; + +////////////////////////////////////////////////////////////////////////// + +bool +UpstreamApply::IsHealthy() const +{ + return false; +} + +std::unique_ptr<UpstreamApply> +UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore) +{ + return std::make_unique<UpstreamApplyImpl>(Options, CidStore); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES |