// Copyright Epic Games, Inc. All Rights Reserved. #include #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include "timeline/workertimeline.h" namespace zen::compute { OrchestratorService::OrchestratorService(std::filesystem::path DataDir, bool EnableWorkerWebSocket) : m_TimelineStore(std::make_unique(DataDir / "timelines")) , m_EnableWorkerWebSocket(EnableWorkerWebSocket) { m_ProbeThread = std::thread{&OrchestratorService::ProbeThreadFunction, this}; } OrchestratorService::~OrchestratorService() { m_ProbeThreadEnabled = false; m_ProbeThreadEvent.Set(); if (m_ProbeThread.joinable()) { m_ProbeThread.join(); } } CbObject OrchestratorService::GetWorkerList() { ZEN_TRACE_CPU("OrchestratorService::GetWorkerList"); CbObjectWriter Cbo; Cbo.BeginArray("workers"); m_KnownWorkersLock.WithSharedLock([&] { for (const auto& [WorkerId, Worker] : m_KnownWorkers) { Cbo.BeginObject(); Cbo << "id" << WorkerId; Cbo << "uri" << Worker.BaseUri; Cbo << "hostname" << Worker.Hostname; if (!Worker.Platform.empty()) { Cbo << "platform" << std::string_view(Worker.Platform); } Cbo << "cpus" << Worker.Cpus; Cbo << "cpu_usage" << Worker.CpuUsagePercent; Cbo << "memory_total" << Worker.MemoryTotalBytes; Cbo << "memory_used" << Worker.MemoryUsedBytes; Cbo << "bytes_received" << Worker.BytesReceived; Cbo << "bytes_sent" << Worker.BytesSent; Cbo << "actions_pending" << Worker.ActionsPending; Cbo << "actions_running" << Worker.ActionsRunning; Cbo << "actions_completed" << Worker.ActionsCompleted; Cbo << "active_queues" << Worker.ActiveQueues; if (!Worker.Provisioner.empty()) { Cbo << "provisioner" << std::string_view(Worker.Provisioner); } if (Worker.Reachable != ReachableState::Unknown) { Cbo << "reachable" << (Worker.Reachable == ReachableState::Reachable); } if (Worker.WsConnected) { Cbo << "ws_connected" << true; } Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs(); Cbo.EndObject(); } }); Cbo.EndArray(); return Cbo.Save(); } void OrchestratorService::AnnounceWorker(const WorkerAnnouncement& Ann) { ZEN_TRACE_CPU("OrchestratorService::AnnounceWorker"); bool IsNew = false; std::string EvictedId; std::string EvictedHostname; m_KnownWorkersLock.WithExclusiveLock([&] { IsNew = (m_KnownWorkers.find(std::string(Ann.Id)) == m_KnownWorkers.end()); // If a different worker ID already maps to the same URI, the old entry // is stale (e.g. a previous Horde lease on the same machine). Remove it // so the dashboard doesn't show duplicates. if (IsNew) { for (auto It = m_KnownWorkers.begin(); It != m_KnownWorkers.end(); ++It) { if (It->second.BaseUri == Ann.Uri && It->first != Ann.Id) { EvictedId = It->first; EvictedHostname = It->second.Hostname; m_KnownWorkers.erase(It); break; } } } auto& Worker = m_KnownWorkers[std::string(Ann.Id)]; Worker.BaseUri = Ann.Uri; Worker.Hostname = Ann.Hostname; if (!Ann.Platform.empty()) { Worker.Platform = Ann.Platform; } Worker.Cpus = Ann.Cpus; Worker.CpuUsagePercent = Ann.CpuUsagePercent; Worker.MemoryTotalBytes = Ann.MemoryTotalBytes; Worker.MemoryUsedBytes = Ann.MemoryUsedBytes; Worker.BytesReceived = Ann.BytesReceived; Worker.BytesSent = Ann.BytesSent; Worker.ActionsPending = Ann.ActionsPending; Worker.ActionsRunning = Ann.ActionsRunning; Worker.ActionsCompleted = Ann.ActionsCompleted; Worker.ActiveQueues = Ann.ActiveQueues; if (!Ann.Provisioner.empty()) { Worker.Provisioner = Ann.Provisioner; } Worker.LastSeen.Reset(); }); if (!EvictedId.empty()) { ZEN_INFO("worker {} superseded by {} (same endpoint)", EvictedId, Ann.Id); RecordProvisioningEvent(ProvisioningEvent::Type::Left, EvictedId, EvictedHostname); } if (IsNew) { RecordProvisioningEvent(ProvisioningEvent::Type::Joined, Ann.Id, Ann.Hostname); } } bool OrchestratorService::IsWorkerWebSocketEnabled() const { return m_EnableWorkerWebSocket; } void OrchestratorService::SetWorkerWebSocketConnected(std::string_view WorkerId, bool Connected) { ReachableState PrevState = ReachableState::Unknown; std::string WorkerHostname; m_KnownWorkersLock.WithExclusiveLock([&] { auto It = m_KnownWorkers.find(std::string(WorkerId)); if (It == m_KnownWorkers.end()) { return; } PrevState = It->second.Reachable; WorkerHostname = It->second.Hostname; It->second.WsConnected = Connected; It->second.Reachable = Connected ? ReachableState::Reachable : ReachableState::Unreachable; if (Connected) { ZEN_INFO("worker {} WebSocket connected — marking reachable", WorkerId); } else { ZEN_WARN("worker {} WebSocket disconnected — marking unreachable", WorkerId); } }); // Record provisioning events for state transitions outside the lock if (Connected && PrevState == ReachableState::Unreachable) { RecordProvisioningEvent(ProvisioningEvent::Type::Returned, WorkerId, WorkerHostname); } else if (!Connected && PrevState == ReachableState::Reachable) { RecordProvisioningEvent(ProvisioningEvent::Type::Left, WorkerId, WorkerHostname); } } CbObject OrchestratorService::GetWorkerTimeline(std::string_view WorkerId, std::optional From, std::optional To, int Limit) { ZEN_TRACE_CPU("OrchestratorService::GetWorkerTimeline"); Ref Timeline = m_TimelineStore->Find(WorkerId); if (!Timeline) { return {}; } std::vector Events; if (From || To) { DateTime StartTime = From.value_or(DateTime(0)); DateTime EndTime = To.value_or(DateTime::Now()); Events = Timeline->QueryTimeline(StartTime, EndTime); } else if (Limit > 0) { Events = Timeline->QueryRecent(Limit); } else { Events = Timeline->QueryRecent(); } WorkerTimeline::TimeRange Range = Timeline->GetTimeRange(); CbObjectWriter Cbo; Cbo << "worker_id" << WorkerId; Cbo << "event_count" << static_cast(Timeline->GetEventCount()); if (Range) { Cbo.AddDateTime("time_first", Range.First); Cbo.AddDateTime("time_last", Range.Last); } Cbo.BeginArray("events"); for (const auto& Evt : Events) { Cbo.BeginObject(); Cbo << "type" << WorkerTimeline::ToString(Evt.Type); Cbo.AddDateTime("ts", Evt.Timestamp); if (Evt.ActionLsn != 0) { Cbo << "lsn" << Evt.ActionLsn; Cbo << "action_id" << Evt.ActionId; } if (Evt.Type == WorkerTimeline::EventType::ActionStateChanged) { Cbo << "prev_state" << RunnerAction::ToString(Evt.PreviousState); Cbo << "state" << RunnerAction::ToString(Evt.ActionState); } if (!Evt.Reason.empty()) { Cbo << "reason" << std::string_view(Evt.Reason); } Cbo.EndObject(); } Cbo.EndArray(); return Cbo.Save(); } CbObject OrchestratorService::GetAllTimelines(DateTime From, DateTime To) { ZEN_TRACE_CPU("OrchestratorService::GetAllTimelines"); DateTime StartTime = From; DateTime EndTime = To; auto AllInfo = m_TimelineStore->GetAllWorkerInfo(); CbObjectWriter Cbo; Cbo.AddDateTime("from", StartTime); Cbo.AddDateTime("to", EndTime); Cbo.BeginArray("workers"); for (const auto& Info : AllInfo) { if (!Info.Range || Info.Range.Last < StartTime || Info.Range.First > EndTime) { continue; } Cbo.BeginObject(); Cbo << "worker_id" << Info.WorkerId; Cbo.AddDateTime("time_first", Info.Range.First); Cbo.AddDateTime("time_last", Info.Range.Last); Cbo.EndObject(); } Cbo.EndArray(); return Cbo.Save(); } void OrchestratorService::RecordProvisioningEvent(ProvisioningEvent::Type Type, std::string_view WorkerId, std::string_view Hostname) { ProvisioningEvent Evt{ .EventType = Type, .Timestamp = DateTime::Now(), .WorkerId = std::string(WorkerId), .Hostname = std::string(Hostname), }; m_ProvisioningLogLock.WithExclusiveLock([&] { m_ProvisioningLog.push_back(std::move(Evt)); while (m_ProvisioningLog.size() > kMaxProvisioningEvents) { m_ProvisioningLog.pop_front(); } }); } CbObject OrchestratorService::GetProvisioningHistory(int Limit) { ZEN_TRACE_CPU("OrchestratorService::GetProvisioningHistory"); if (Limit <= 0) { Limit = 100; } CbObjectWriter Cbo; Cbo.BeginArray("events"); m_ProvisioningLogLock.WithSharedLock([&] { // Return last N events, newest first int Count = 0; for (auto It = m_ProvisioningLog.rbegin(); It != m_ProvisioningLog.rend() && Count < Limit; ++It, ++Count) { const auto& Evt = *It; Cbo.BeginObject(); switch (Evt.EventType) { case ProvisioningEvent::Type::Joined: Cbo << "type" << "joined"; break; case ProvisioningEvent::Type::Left: Cbo << "type" << "left"; break; case ProvisioningEvent::Type::Returned: Cbo << "type" << "returned"; break; } Cbo.AddDateTime("ts", Evt.Timestamp); Cbo << "worker_id" << std::string_view(Evt.WorkerId); Cbo << "hostname" << std::string_view(Evt.Hostname); Cbo.EndObject(); } }); Cbo.EndArray(); return Cbo.Save(); } std::string OrchestratorService::AnnounceClient(const ClientAnnouncement& Ann) { ZEN_TRACE_CPU("OrchestratorService::AnnounceClient"); std::string ClientId = fmt::format("client-{}", Oid::NewOid().ToString()); bool IsNew = false; m_KnownClientsLock.WithExclusiveLock([&] { auto It = m_KnownClients.find(ClientId); IsNew = (It == m_KnownClients.end()); auto& Client = m_KnownClients[ClientId]; Client.SessionId = Ann.SessionId; Client.Hostname = Ann.Hostname; if (!Ann.Address.empty()) { Client.Address = Ann.Address; } if (Ann.Metadata) { Client.Metadata = Ann.Metadata; } Client.LastSeen.Reset(); }); if (IsNew) { RecordClientEvent(ClientEvent::Type::Connected, ClientId, Ann.Hostname); } else { RecordClientEvent(ClientEvent::Type::Updated, ClientId, Ann.Hostname); } return ClientId; } bool OrchestratorService::UpdateClient(std::string_view ClientId, CbObject Metadata) { ZEN_TRACE_CPU("OrchestratorService::UpdateClient"); bool Found = false; m_KnownClientsLock.WithExclusiveLock([&] { auto It = m_KnownClients.find(std::string(ClientId)); if (It != m_KnownClients.end()) { Found = true; if (Metadata) { It->second.Metadata = std::move(Metadata); } It->second.LastSeen.Reset(); } }); return Found; } bool OrchestratorService::CompleteClient(std::string_view ClientId) { ZEN_TRACE_CPU("OrchestratorService::CompleteClient"); std::string Hostname; bool Found = false; m_KnownClientsLock.WithExclusiveLock([&] { auto It = m_KnownClients.find(std::string(ClientId)); if (It != m_KnownClients.end()) { Found = true; Hostname = It->second.Hostname; m_KnownClients.erase(It); } }); if (Found) { RecordClientEvent(ClientEvent::Type::Disconnected, ClientId, Hostname); } return Found; } CbObject OrchestratorService::GetClientList() { ZEN_TRACE_CPU("OrchestratorService::GetClientList"); CbObjectWriter Cbo; Cbo.BeginArray("clients"); m_KnownClientsLock.WithSharedLock([&] { for (const auto& [ClientId, Client] : m_KnownClients) { Cbo.BeginObject(); Cbo << "id" << ClientId; if (Client.SessionId) { Cbo << "session_id" << Client.SessionId; } Cbo << "hostname" << std::string_view(Client.Hostname); if (!Client.Address.empty()) { Cbo << "address" << std::string_view(Client.Address); } Cbo << "dt" << Client.LastSeen.GetElapsedTimeMs(); if (Client.Metadata) { Cbo << "metadata" << Client.Metadata; } Cbo.EndObject(); } }); Cbo.EndArray(); return Cbo.Save(); } CbObject OrchestratorService::GetClientHistory(int Limit) { ZEN_TRACE_CPU("OrchestratorService::GetClientHistory"); if (Limit <= 0) { Limit = 100; } CbObjectWriter Cbo; Cbo.BeginArray("client_events"); m_ClientLogLock.WithSharedLock([&] { int Count = 0; for (auto It = m_ClientLog.rbegin(); It != m_ClientLog.rend() && Count < Limit; ++It, ++Count) { const auto& Evt = *It; Cbo.BeginObject(); switch (Evt.EventType) { case ClientEvent::Type::Connected: Cbo << "type" << "connected"; break; case ClientEvent::Type::Disconnected: Cbo << "type" << "disconnected"; break; case ClientEvent::Type::Updated: Cbo << "type" << "updated"; break; } Cbo.AddDateTime("ts", Evt.Timestamp); Cbo << "client_id" << std::string_view(Evt.ClientId); Cbo << "hostname" << std::string_view(Evt.Hostname); Cbo.EndObject(); } }); Cbo.EndArray(); return Cbo.Save(); } void OrchestratorService::RecordClientEvent(ClientEvent::Type Type, std::string_view ClientId, std::string_view Hostname) { ClientEvent Evt{ .EventType = Type, .Timestamp = DateTime::Now(), .ClientId = std::string(ClientId), .Hostname = std::string(Hostname), }; m_ClientLogLock.WithExclusiveLock([&] { m_ClientLog.push_back(std::move(Evt)); while (m_ClientLog.size() > kMaxClientEvents) { m_ClientLog.pop_front(); } }); } void OrchestratorService::ProbeThreadFunction() { ZEN_TRACE_CPU("OrchestratorService::ProbeThreadFunction"); SetCurrentThreadName("orch_probe"); bool IsFirstProbe = true; do { if (!IsFirstProbe) { m_ProbeThreadEvent.Wait(5'000); m_ProbeThreadEvent.Reset(); } else { IsFirstProbe = false; } if (m_ProbeThreadEnabled == false) { return; } m_ProbeThreadEvent.Reset(); // Snapshot worker IDs and URIs under shared lock struct WorkerSnapshot { std::string Id; std::string Uri; bool WsConnected = false; }; std::vector Snapshots; m_KnownWorkersLock.WithSharedLock([&] { Snapshots.reserve(m_KnownWorkers.size()); for (const auto& [WorkerId, Worker] : m_KnownWorkers) { Snapshots.push_back({WorkerId, Worker.BaseUri, Worker.WsConnected}); } }); // Probe each worker outside the lock for (const auto& Snap : Snapshots) { if (m_ProbeThreadEnabled == false) { return; } // Workers with an active WebSocket connection are known-reachable; // skip the HTTP health probe for them. if (Snap.WsConnected) { continue; } ReachableState NewState = ReachableState::Unreachable; try { HttpClient Client(Snap.Uri, {.ConnectTimeout = std::chrono::milliseconds{3000}, .Timeout = std::chrono::milliseconds{5000}}); HttpClient::Response Response = Client.Get("/health/"); if (Response.IsSuccess()) { NewState = ReachableState::Reachable; } } catch (const std::exception& Ex) { ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what()); } ReachableState PrevState = ReachableState::Unknown; std::string WorkerHostname; m_KnownWorkersLock.WithExclusiveLock([&] { auto It = m_KnownWorkers.find(Snap.Id); if (It != m_KnownWorkers.end()) { PrevState = It->second.Reachable; WorkerHostname = It->second.Hostname; It->second.Reachable = NewState; It->second.LastProbed.Reset(); if (PrevState != NewState) { if (NewState == ReachableState::Reachable && PrevState == ReachableState::Unreachable) { ZEN_INFO("worker {} ({}) is reachable again", Snap.Id, Snap.Uri); } else if (NewState == ReachableState::Reachable) { ZEN_INFO("worker {} ({}) is now reachable", Snap.Id, Snap.Uri); } else if (PrevState == ReachableState::Reachable) { ZEN_WARN("worker {} ({}) is no longer reachable", Snap.Id, Snap.Uri); } else { ZEN_WARN("worker {} ({}) is not reachable", Snap.Id, Snap.Uri); } } } }); // Record provisioning events for state transitions outside the lock if (PrevState != NewState) { if (NewState == ReachableState::Unreachable && PrevState == ReachableState::Reachable) { RecordProvisioningEvent(ProvisioningEvent::Type::Left, Snap.Id, WorkerHostname); } else if (NewState == ReachableState::Reachable && PrevState == ReachableState::Unreachable) { RecordProvisioningEvent(ProvisioningEvent::Type::Returned, Snap.Id, WorkerHostname); } } } // Sweep expired clients (5-minute timeout) static constexpr int64_t kClientTimeoutMs = 5 * 60 * 1000; struct ExpiredClient { std::string Id; std::string Hostname; }; std::vector ExpiredClients; m_KnownClientsLock.WithExclusiveLock([&] { for (auto It = m_KnownClients.begin(); It != m_KnownClients.end();) { if (It->second.LastSeen.GetElapsedTimeMs() > kClientTimeoutMs) { ExpiredClients.push_back({It->first, It->second.Hostname}); It = m_KnownClients.erase(It); } else { ++It; } } }); for (const auto& Expired : ExpiredClients) { ZEN_INFO("client {} timed out (no announcement for >5 minutes)", Expired.Id); RecordClientEvent(ClientEvent::Type::Disconnected, Expired.Id, Expired.Hostname); } } while (m_ProbeThreadEnabled); } } // namespace zen::compute #endif