// Copyright Epic Games, Inc. All Rights Reserved. #include #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include "timeline/workertimeline.h" namespace zen::compute { OrchestratorService::OrchestratorService(std::filesystem::path DataDir) : m_TimelineStore(std::make_unique(DataDir / "timelines")) { m_ProbeThread = std::thread{&OrchestratorService::ProbeThreadFunction, this}; } OrchestratorService::~OrchestratorService() { m_ProbeThreadEnabled = false; m_ProbeThreadEvent.Set(); if (m_ProbeThread.joinable()) { m_ProbeThread.join(); } } CbObject OrchestratorService::GetWorkerList() { ZEN_TRACE_CPU("OrchestratorService::GetWorkerList"); CbObjectWriter Cbo; Cbo.BeginArray("workers"); m_KnownWorkersLock.WithSharedLock([&] { for (const auto& [WorkerId, Worker] : m_KnownWorkers) { Cbo.BeginObject(); Cbo << "id" << WorkerId; Cbo << "uri" << Worker.BaseUri; Cbo << "hostname" << Worker.Hostname; Cbo << "cpus" << Worker.Cpus; Cbo << "cpu_usage" << Worker.CpuUsagePercent; Cbo << "memory_total" << Worker.MemoryTotalBytes; Cbo << "memory_used" << Worker.MemoryUsedBytes; if (Worker.Reachable != ReachableState::Unknown) { Cbo << "reachable" << (Worker.Reachable == ReachableState::Reachable); } Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs(); Cbo.EndObject(); } }); Cbo.EndArray(); return Cbo.Save(); } void OrchestratorService::AnnounceWorker(const WorkerAnnouncement& Ann) { ZEN_TRACE_CPU("OrchestratorService::AnnounceWorker"); m_KnownWorkersLock.WithExclusiveLock([&] { auto& Worker = m_KnownWorkers[std::string(Ann.Id)]; Worker.BaseUri = Ann.Uri; Worker.Hostname = Ann.Hostname; Worker.Cpus = Ann.Cpus; Worker.CpuUsagePercent = Ann.CpuUsagePercent; Worker.MemoryTotalBytes = Ann.MemoryTotalBytes; Worker.MemoryUsedBytes = Ann.MemoryUsedBytes; Worker.LastSeen.Reset(); }); } CbObject OrchestratorService::GetWorkerTimeline(std::string_view WorkerId, std::string_view FromStr, std::string_view ToStr, std::string_view LimitStr) { ZEN_TRACE_CPU("OrchestratorService::GetWorkerTimeline"); Ref Timeline = m_TimelineStore->Find(WorkerId); if (!Timeline) { return {}; } std::vector Events; if (!FromStr.empty() || !ToStr.empty()) { DateTime StartTime = !FromStr.empty() ? DateTime(std::stoull(std::string(FromStr))) : DateTime(0); DateTime EndTime = !ToStr.empty() ? DateTime(std::stoull(std::string(ToStr))) : DateTime::Now(); Events = Timeline->QueryTimeline(StartTime, EndTime); } else if (!LimitStr.empty()) { int Limit = std::atoi(std::string(LimitStr).c_str()); Events = Timeline->QueryRecent(Limit); } else { Events = Timeline->QueryRecent(); } WorkerTimeline::TimeRange Range = Timeline->GetTimeRange(); CbObjectWriter Cbo; Cbo << "worker_id" << WorkerId; Cbo << "event_count" << static_cast(Timeline->GetEventCount()); if (Range) { Cbo.AddDateTime("time_first", Range.First); Cbo.AddDateTime("time_last", Range.Last); } Cbo.BeginArray("events"); for (const auto& Evt : Events) { Cbo.BeginObject(); Cbo << "type" << WorkerTimeline::ToString(Evt.Type); Cbo.AddDateTime("ts", Evt.Timestamp); if (Evt.ActionLsn != 0) { Cbo << "lsn" << Evt.ActionLsn; Cbo << "action_id" << Evt.ActionId; } if (Evt.Type == WorkerTimeline::EventType::ActionStateChanged) { Cbo << "prev_state" << RunnerAction::ToString(Evt.PreviousState); Cbo << "state" << RunnerAction::ToString(Evt.ActionState); } if (!Evt.Reason.empty()) { Cbo << "reason" << std::string_view(Evt.Reason); } Cbo.EndObject(); } Cbo.EndArray(); return Cbo.Save(); } CbObject OrchestratorService::GetAllTimelines(std::string_view FromStr, std::string_view ToStr) { ZEN_TRACE_CPU("OrchestratorService::GetAllTimelines"); DateTime StartTime = !FromStr.empty() ? DateTime(std::stoull(std::string(FromStr))) : DateTime(0); DateTime EndTime = !ToStr.empty() ? DateTime(std::stoull(std::string(ToStr))) : DateTime::Now(); auto AllInfo = m_TimelineStore->GetAllWorkerInfo(); CbObjectWriter Cbo; Cbo.AddDateTime("from", StartTime); Cbo.AddDateTime("to", EndTime); Cbo.BeginArray("workers"); for (const auto& Info : AllInfo) { if (!Info.Range || Info.Range.Last < StartTime || Info.Range.First > EndTime) { continue; } Cbo.BeginObject(); Cbo << "worker_id" << Info.WorkerId; Cbo.AddDateTime("time_first", Info.Range.First); Cbo.AddDateTime("time_last", Info.Range.Last); Cbo.EndObject(); } Cbo.EndArray(); return Cbo.Save(); } void OrchestratorService::ProbeThreadFunction() { ZEN_TRACE_CPU("OrchestratorService::ProbeThreadFunction"); SetCurrentThreadName("orch_probe"); bool IsFirstProbe = true; do { if (!IsFirstProbe) { m_ProbeThreadEvent.Wait(5'000); m_ProbeThreadEvent.Reset(); } else { IsFirstProbe = false; } if (m_ProbeThreadEnabled == false) { return; } m_ProbeThreadEvent.Reset(); // Snapshot worker IDs and URIs under shared lock struct WorkerSnapshot { std::string Id; std::string Uri; }; std::vector Snapshots; m_KnownWorkersLock.WithSharedLock([&] { Snapshots.reserve(m_KnownWorkers.size()); for (const auto& [WorkerId, Worker] : m_KnownWorkers) { Snapshots.push_back({WorkerId, Worker.BaseUri}); } }); // Probe each worker outside the lock for (const auto& Snap : Snapshots) { if (m_ProbeThreadEnabled == false) { return; } ReachableState NewState = ReachableState::Unreachable; try { HttpClient Client(Snap.Uri, {.ConnectTimeout = std::chrono::milliseconds{3000}, .Timeout = std::chrono::milliseconds{5000}}); HttpClient::Response Response = Client.Get("/health/"); if (Response.IsSuccess()) { NewState = ReachableState::Reachable; } } catch (const std::exception& Ex) { ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what()); } m_KnownWorkersLock.WithExclusiveLock([&] { auto It = m_KnownWorkers.find(Snap.Id); if (It != m_KnownWorkers.end()) { ReachableState PrevState = It->second.Reachable; It->second.Reachable = NewState; It->second.LastProbed.Reset(); if (PrevState != NewState) { if (NewState == ReachableState::Reachable && PrevState == ReachableState::Unreachable) { ZEN_INFO("worker {} ({}) is reachable again", Snap.Id, Snap.Uri); } else if (NewState == ReachableState::Reachable) { ZEN_INFO("worker {} ({}) is now reachable", Snap.Id, Snap.Uri); } else if (PrevState == ReachableState::Reachable) { ZEN_WARN("worker {} ({}) is no longer reachable", Snap.Id, Snap.Uri); } else { ZEN_WARN("worker {} ({}) is not reachable", Snap.Id, Snap.Uri); } } } }); } } while (m_ProbeThreadEnabled); } } // namespace zen::compute #endif