// Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamapply.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include "diag/logging.h" # include # include namespace zen { using namespace std::literals; struct UpstreamApplyStats { static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { Stats.ErrorCount.Increment(1); } else if (Result.Success) { Stats.PostCount.Increment(1); Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024); } } void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { Stats.ErrorCount.Increment(1); } else if (Result.Success) { Stats.UpdateCount.Increment(1); Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024); if (!Result.Completed.empty()) { uint64_t Completed = 0; for (auto& It : Result.Completed) { Completed += It.second.size(); } Stats.CompleteCount.Increment(Completed); } } } bool m_Enabled; }; ////////////////////////////////////////////////////////////////////////// class UpstreamApplyImpl final : public UpstreamApply { public: UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore) : m_Log(logging::Get("upstream-apply")) , m_Options(Options) , m_CidStore(CidStore) , m_Stats(Options.StatsEnabled) , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount) , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount) { } virtual ~UpstreamApplyImpl() { Shutdown(); } virtual bool Initialize() override { for (auto& Endpoint : m_Endpoints) { const UpstreamEndpointHealth Health = Endpoint->Initialize(); if (Health.Ok) { Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); } else { Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); } } m_RunState.IsRunning = !m_Endpoints.empty(); if (m_RunState.IsRunning) { m_ShutdownEvent.Reset(); m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); } return m_RunState.IsRunning; } virtual bool IsHealthy() const override { if (m_RunState.IsRunning) { for (const auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { return true; } } } return false; } virtual void RegisterEndpoint(std::unique_ptr Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override { if (m_RunState.IsRunning) { const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); const IoHash ActionId = ApplyRecord.Action.GetHash(); const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { // Already in progress return {.ApplyId = ActionId, .Success = true}; } std::chrono::steady_clock::time_point ExpireTime = TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) : std::chrono::steady_clock::time_point::max(); m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; } ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks(); m_UpstreamAsyncWorkPool.ScheduleWork( [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); return {.ApplyId = ActionId, .Success = true}; } return {}; } virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override { if (m_RunState.IsRunning) { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { return {.Status = *Status, .Success = true}; } } return {}; } virtual void GetStatus(CbObjectWriter& Status) override { Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork(); Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { Status.BeginObject(); Status << "name" << Ep->DisplayName(); Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamApplyEndpointStats& Stats = Ep->Stats(); const uint64_t PostCount = Stats.PostCount.Value(); const uint64_t CompleteCount = Stats.CompleteCount.Value(); // const uint64_t UpdateCount = Stats.UpdateCount; const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; Status << "post_count" << PostCount; Status << "complete_count" << PostCount; Status << "update_count" << Stats.UpdateCount.Value(); Status << "complete_ratio" << CompleteRate; Status << "downloaded_mb" << Stats.DownBytes.Value(); Status << "uploaded_mb" << Stats.UpBytes.Value(); Status << "error_count" << Stats.ErrorCount.Value(); Status.EndObject(); } Status.EndArray(); } private: // The caller is responsible for locking if required UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) { if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) { if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) { return &It2->second; } } return nullptr; } void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) { const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); const IoHash ActionId = ApplyRecord.Action.GetHash(); try { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks(); PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { Status->Timepoints.merge(Result.Timepoints); if (Result.Success) { Status->State = UpstreamApplyState::Executing; } else { Status->State = UpstreamApplyState::Complete; Status->Result = {.Error = std::move(Result.Error), .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds}; } } } m_Stats.Add(*Endpoint, Result); return; } } Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { Status->State = UpstreamApplyState::Complete; Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; } } } catch (std::exception& e) { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { Status->State = UpstreamApplyState::Complete; Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; } Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); } } void ProcessApplyUpdates() { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool); m_Stats.Add(*Endpoint, Result); if (!Result.Success) { Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); } if (!Result.Completed.empty()) { for (auto& It : Result.Completed) { for (auto& It2 : It.second) { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) { Status->State = UpstreamApplyState::Complete; Status->Result = std::move(It2.second); Status->Result.Timepoints.merge(Status->Timepoints); Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks(); Status->Timepoints.clear(); } } } } } } } void ProcessUpstreamUpdates() { const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval); while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count()))) { if (!m_RunState.IsRunning) { break; } ProcessApplyUpdates(); // Remove any expired tasks, regardless of state { std::scoped_lock Lock(m_ApplyTasksMutex); for (auto& WorkerIt : m_ApplyTasks) { const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { return Item.second.ExpireTime < std::chrono::steady_clock::now(); }); if (Count > 0) { Log().debug("Removed '{}' expired tasks", Count); } } const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); if (Count > 0) { Log().debug("Removed '{}' empty task lists", Count); } } } } void MonitorEndpoints() { for (;;) { { std::unique_lock Lock(m_RunState.Mutex); if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) { break; } } for (auto& Endpoint : m_Endpoints) { if (!Endpoint->IsHealthy()) { if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) { Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); } else { Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); } } } } } void Shutdown() { if (m_RunState.Stop()) { m_ShutdownEvent.Set(); m_EndpointMonitorThread.join(); m_UpstreamUpdatesThread.join(); m_Endpoints.clear(); } } spdlog::logger& Log() { return m_Log; } struct RunState { std::mutex Mutex; std::condition_variable ExitSignal; std::atomic_bool IsRunning{false}; bool Stop() { bool Stopped = false; { std::scoped_lock Lock(Mutex); Stopped = IsRunning.exchange(false); } if (Stopped) { ExitSignal.notify_all(); } return Stopped; } }; spdlog::logger& m_Log; UpstreamApplyOptions m_Options; CidStore& m_CidStore; UpstreamApplyStats m_Stats; UpstreamApplyTasks m_ApplyTasks; std::mutex m_ApplyTasksMutex; std::vector> m_Endpoints; Event m_ShutdownEvent; WorkerThreadPool m_UpstreamAsyncWorkPool; WorkerThreadPool m_DownstreamAsyncWorkPool; std::thread m_UpstreamUpdatesThread; std::thread m_EndpointMonitorThread; RunState m_RunState; }; ////////////////////////////////////////////////////////////////////////// bool UpstreamApply::IsHealthy() const { return false; } std::unique_ptr UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore) { return std::make_unique(Options, CidStore); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES