aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/orchestratorservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/orchestratorservice.cpp')
-rw-r--r--src/zencompute/orchestratorservice.cpp710
1 files changed, 710 insertions, 0 deletions
diff --git a/src/zencompute/orchestratorservice.cpp b/src/zencompute/orchestratorservice.cpp
new file mode 100644
index 000000000..9ea695305
--- /dev/null
+++ b/src/zencompute/orchestratorservice.cpp
@@ -0,0 +1,710 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencompute/orchestratorservice.h>
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/logging.h>
+# include <zencore/trace.h>
+# include <zenhttp/httpclient.h>
+
+# include "timeline/workertimeline.h"
+
+namespace zen::compute {
+
+OrchestratorService::OrchestratorService(std::filesystem::path DataDir, bool EnableWorkerWebSocket)
+: m_TimelineStore(std::make_unique<WorkerTimelineStore>(DataDir / "timelines"))
+, m_EnableWorkerWebSocket(EnableWorkerWebSocket)
+{
+ m_ProbeThread = std::thread{&OrchestratorService::ProbeThreadFunction, this};
+}
+
+OrchestratorService::~OrchestratorService()
+{
+ m_ProbeThreadEnabled = false;
+ m_ProbeThreadEvent.Set();
+ if (m_ProbeThread.joinable())
+ {
+ m_ProbeThread.join();
+ }
+}
+
+CbObject
+OrchestratorService::GetWorkerList()
+{
+ ZEN_TRACE_CPU("OrchestratorService::GetWorkerList");
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers");
+
+ m_KnownWorkersLock.WithSharedLock([&] {
+ for (const auto& [WorkerId, Worker] : m_KnownWorkers)
+ {
+ Cbo.BeginObject();
+ Cbo << "id" << WorkerId;
+ Cbo << "uri" << Worker.BaseUri;
+ Cbo << "hostname" << Worker.Hostname;
+ if (!Worker.Platform.empty())
+ {
+ Cbo << "platform" << std::string_view(Worker.Platform);
+ }
+ Cbo << "cpus" << Worker.Cpus;
+ Cbo << "cpu_usage" << Worker.CpuUsagePercent;
+ Cbo << "memory_total" << Worker.MemoryTotalBytes;
+ Cbo << "memory_used" << Worker.MemoryUsedBytes;
+ Cbo << "bytes_received" << Worker.BytesReceived;
+ Cbo << "bytes_sent" << Worker.BytesSent;
+ Cbo << "actions_pending" << Worker.ActionsPending;
+ Cbo << "actions_running" << Worker.ActionsRunning;
+ Cbo << "actions_completed" << Worker.ActionsCompleted;
+ Cbo << "active_queues" << Worker.ActiveQueues;
+ if (!Worker.Provisioner.empty())
+ {
+ Cbo << "provisioner" << std::string_view(Worker.Provisioner);
+ }
+ if (Worker.Reachable != ReachableState::Unknown)
+ {
+ Cbo << "reachable" << (Worker.Reachable == ReachableState::Reachable);
+ }
+ if (Worker.WsConnected)
+ {
+ Cbo << "ws_connected" << true;
+ }
+ Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs();
+ Cbo.EndObject();
+ }
+ });
+
+ Cbo.EndArray();
+ return Cbo.Save();
+}
+
+void
+OrchestratorService::AnnounceWorker(const WorkerAnnouncement& Ann)
+{
+ ZEN_TRACE_CPU("OrchestratorService::AnnounceWorker");
+
+ bool IsNew = false;
+ std::string EvictedId;
+ std::string EvictedHostname;
+
+ m_KnownWorkersLock.WithExclusiveLock([&] {
+ IsNew = (m_KnownWorkers.find(std::string(Ann.Id)) == m_KnownWorkers.end());
+
+ // If a different worker ID already maps to the same URI, the old entry
+ // is stale (e.g. a previous Horde lease on the same machine). Remove it
+ // so the dashboard doesn't show duplicates.
+ if (IsNew)
+ {
+ for (auto It = m_KnownWorkers.begin(); It != m_KnownWorkers.end(); ++It)
+ {
+ if (It->second.BaseUri == Ann.Uri && It->first != Ann.Id)
+ {
+ EvictedId = It->first;
+ EvictedHostname = It->second.Hostname;
+ m_KnownWorkers.erase(It);
+ break;
+ }
+ }
+ }
+
+ auto& Worker = m_KnownWorkers[std::string(Ann.Id)];
+ Worker.BaseUri = Ann.Uri;
+ Worker.Hostname = Ann.Hostname;
+ if (!Ann.Platform.empty())
+ {
+ Worker.Platform = Ann.Platform;
+ }
+ Worker.Cpus = Ann.Cpus;
+ Worker.CpuUsagePercent = Ann.CpuUsagePercent;
+ Worker.MemoryTotalBytes = Ann.MemoryTotalBytes;
+ Worker.MemoryUsedBytes = Ann.MemoryUsedBytes;
+ Worker.BytesReceived = Ann.BytesReceived;
+ Worker.BytesSent = Ann.BytesSent;
+ Worker.ActionsPending = Ann.ActionsPending;
+ Worker.ActionsRunning = Ann.ActionsRunning;
+ Worker.ActionsCompleted = Ann.ActionsCompleted;
+ Worker.ActiveQueues = Ann.ActiveQueues;
+ if (!Ann.Provisioner.empty())
+ {
+ Worker.Provisioner = Ann.Provisioner;
+ }
+ Worker.LastSeen.Reset();
+ });
+
+ if (!EvictedId.empty())
+ {
+ ZEN_INFO("worker {} superseded by {} (same endpoint)", EvictedId, Ann.Id);
+ RecordProvisioningEvent(ProvisioningEvent::Type::Left, EvictedId, EvictedHostname);
+ }
+
+ if (IsNew)
+ {
+ RecordProvisioningEvent(ProvisioningEvent::Type::Joined, Ann.Id, Ann.Hostname);
+ }
+}
+
+bool
+OrchestratorService::IsWorkerWebSocketEnabled() const
+{
+ return m_EnableWorkerWebSocket;
+}
+
+void
+OrchestratorService::SetWorkerWebSocketConnected(std::string_view WorkerId, bool Connected)
+{
+ ReachableState PrevState = ReachableState::Unknown;
+ std::string WorkerHostname;
+
+ m_KnownWorkersLock.WithExclusiveLock([&] {
+ auto It = m_KnownWorkers.find(std::string(WorkerId));
+ if (It == m_KnownWorkers.end())
+ {
+ return;
+ }
+
+ PrevState = It->second.Reachable;
+ WorkerHostname = It->second.Hostname;
+ It->second.WsConnected = Connected;
+ It->second.Reachable = Connected ? ReachableState::Reachable : ReachableState::Unreachable;
+
+ if (Connected)
+ {
+ ZEN_INFO("worker {} WebSocket connected — marking reachable", WorkerId);
+ }
+ else
+ {
+ ZEN_WARN("worker {} WebSocket disconnected — marking unreachable", WorkerId);
+ }
+ });
+
+ // Record provisioning events for state transitions outside the lock
+ if (Connected && PrevState == ReachableState::Unreachable)
+ {
+ RecordProvisioningEvent(ProvisioningEvent::Type::Returned, WorkerId, WorkerHostname);
+ }
+ else if (!Connected && PrevState == ReachableState::Reachable)
+ {
+ RecordProvisioningEvent(ProvisioningEvent::Type::Left, WorkerId, WorkerHostname);
+ }
+}
+
+CbObject
+OrchestratorService::GetWorkerTimeline(std::string_view WorkerId, std::optional<DateTime> From, std::optional<DateTime> To, int Limit)
+{
+ ZEN_TRACE_CPU("OrchestratorService::GetWorkerTimeline");
+
+ Ref<WorkerTimeline> Timeline = m_TimelineStore->Find(WorkerId);
+ if (!Timeline)
+ {
+ return {};
+ }
+
+ std::vector<WorkerTimeline::Event> Events;
+
+ if (From || To)
+ {
+ DateTime StartTime = From.value_or(DateTime(0));
+ DateTime EndTime = To.value_or(DateTime::Now());
+ Events = Timeline->QueryTimeline(StartTime, EndTime);
+ }
+ else if (Limit > 0)
+ {
+ Events = Timeline->QueryRecent(Limit);
+ }
+ else
+ {
+ Events = Timeline->QueryRecent();
+ }
+
+ WorkerTimeline::TimeRange Range = Timeline->GetTimeRange();
+
+ CbObjectWriter Cbo;
+ Cbo << "worker_id" << WorkerId;
+ Cbo << "event_count" << static_cast<int32_t>(Timeline->GetEventCount());
+
+ if (Range)
+ {
+ Cbo.AddDateTime("time_first", Range.First);
+ Cbo.AddDateTime("time_last", Range.Last);
+ }
+
+ Cbo.BeginArray("events");
+ for (const auto& Evt : Events)
+ {
+ Cbo.BeginObject();
+ Cbo << "type" << WorkerTimeline::ToString(Evt.Type);
+ Cbo.AddDateTime("ts", Evt.Timestamp);
+
+ if (Evt.ActionLsn != 0)
+ {
+ Cbo << "lsn" << Evt.ActionLsn;
+ Cbo << "action_id" << Evt.ActionId;
+ }
+
+ if (Evt.Type == WorkerTimeline::EventType::ActionStateChanged)
+ {
+ Cbo << "prev_state" << RunnerAction::ToString(Evt.PreviousState);
+ Cbo << "state" << RunnerAction::ToString(Evt.ActionState);
+ }
+
+ if (!Evt.Reason.empty())
+ {
+ Cbo << "reason" << std::string_view(Evt.Reason);
+ }
+
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+
+ return Cbo.Save();
+}
+
+CbObject
+OrchestratorService::GetAllTimelines(DateTime From, DateTime To)
+{
+ ZEN_TRACE_CPU("OrchestratorService::GetAllTimelines");
+
+ DateTime StartTime = From;
+ DateTime EndTime = To;
+
+ auto AllInfo = m_TimelineStore->GetAllWorkerInfo();
+
+ CbObjectWriter Cbo;
+ Cbo.AddDateTime("from", StartTime);
+ Cbo.AddDateTime("to", EndTime);
+
+ Cbo.BeginArray("workers");
+ for (const auto& Info : AllInfo)
+ {
+ if (!Info.Range || Info.Range.Last < StartTime || Info.Range.First > EndTime)
+ {
+ continue;
+ }
+
+ Cbo.BeginObject();
+ Cbo << "worker_id" << Info.WorkerId;
+ Cbo.AddDateTime("time_first", Info.Range.First);
+ Cbo.AddDateTime("time_last", Info.Range.Last);
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+
+ return Cbo.Save();
+}
+
+void
+OrchestratorService::RecordProvisioningEvent(ProvisioningEvent::Type Type, std::string_view WorkerId, std::string_view Hostname)
+{
+ ProvisioningEvent Evt{
+ .EventType = Type,
+ .Timestamp = DateTime::Now(),
+ .WorkerId = std::string(WorkerId),
+ .Hostname = std::string(Hostname),
+ };
+
+ m_ProvisioningLogLock.WithExclusiveLock([&] {
+ m_ProvisioningLog.push_back(std::move(Evt));
+ while (m_ProvisioningLog.size() > kMaxProvisioningEvents)
+ {
+ m_ProvisioningLog.pop_front();
+ }
+ });
+}
+
+CbObject
+OrchestratorService::GetProvisioningHistory(int Limit)
+{
+ ZEN_TRACE_CPU("OrchestratorService::GetProvisioningHistory");
+
+ if (Limit <= 0)
+ {
+ Limit = 100;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("events");
+
+ m_ProvisioningLogLock.WithSharedLock([&] {
+ // Return last N events, newest first
+ int Count = 0;
+ for (auto It = m_ProvisioningLog.rbegin(); It != m_ProvisioningLog.rend() && Count < Limit; ++It, ++Count)
+ {
+ const auto& Evt = *It;
+ Cbo.BeginObject();
+
+ switch (Evt.EventType)
+ {
+ case ProvisioningEvent::Type::Joined:
+ Cbo << "type"
+ << "joined";
+ break;
+ case ProvisioningEvent::Type::Left:
+ Cbo << "type"
+ << "left";
+ break;
+ case ProvisioningEvent::Type::Returned:
+ Cbo << "type"
+ << "returned";
+ break;
+ }
+
+ Cbo.AddDateTime("ts", Evt.Timestamp);
+ Cbo << "worker_id" << std::string_view(Evt.WorkerId);
+ Cbo << "hostname" << std::string_view(Evt.Hostname);
+ Cbo.EndObject();
+ }
+ });
+
+ Cbo.EndArray();
+ return Cbo.Save();
+}
+
+std::string
+OrchestratorService::AnnounceClient(const ClientAnnouncement& Ann)
+{
+ ZEN_TRACE_CPU("OrchestratorService::AnnounceClient");
+
+ std::string ClientId = fmt::format("client-{}", Oid::NewOid().ToString());
+
+ bool IsNew = false;
+
+ m_KnownClientsLock.WithExclusiveLock([&] {
+ auto It = m_KnownClients.find(ClientId);
+ IsNew = (It == m_KnownClients.end());
+
+ auto& Client = m_KnownClients[ClientId];
+ Client.SessionId = Ann.SessionId;
+ Client.Hostname = Ann.Hostname;
+ if (!Ann.Address.empty())
+ {
+ Client.Address = Ann.Address;
+ }
+ if (Ann.Metadata)
+ {
+ Client.Metadata = Ann.Metadata;
+ }
+ Client.LastSeen.Reset();
+ });
+
+ if (IsNew)
+ {
+ RecordClientEvent(ClientEvent::Type::Connected, ClientId, Ann.Hostname);
+ }
+ else
+ {
+ RecordClientEvent(ClientEvent::Type::Updated, ClientId, Ann.Hostname);
+ }
+
+ return ClientId;
+}
+
+bool
+OrchestratorService::UpdateClient(std::string_view ClientId, CbObject Metadata)
+{
+ ZEN_TRACE_CPU("OrchestratorService::UpdateClient");
+
+ bool Found = false;
+
+ m_KnownClientsLock.WithExclusiveLock([&] {
+ auto It = m_KnownClients.find(std::string(ClientId));
+ if (It != m_KnownClients.end())
+ {
+ Found = true;
+ if (Metadata)
+ {
+ It->second.Metadata = std::move(Metadata);
+ }
+ It->second.LastSeen.Reset();
+ }
+ });
+
+ return Found;
+}
+
+bool
+OrchestratorService::CompleteClient(std::string_view ClientId)
+{
+ ZEN_TRACE_CPU("OrchestratorService::CompleteClient");
+
+ std::string Hostname;
+ bool Found = false;
+
+ m_KnownClientsLock.WithExclusiveLock([&] {
+ auto It = m_KnownClients.find(std::string(ClientId));
+ if (It != m_KnownClients.end())
+ {
+ Found = true;
+ Hostname = It->second.Hostname;
+ m_KnownClients.erase(It);
+ }
+ });
+
+ if (Found)
+ {
+ RecordClientEvent(ClientEvent::Type::Disconnected, ClientId, Hostname);
+ }
+
+ return Found;
+}
+
+CbObject
+OrchestratorService::GetClientList()
+{
+ ZEN_TRACE_CPU("OrchestratorService::GetClientList");
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("clients");
+
+ m_KnownClientsLock.WithSharedLock([&] {
+ for (const auto& [ClientId, Client] : m_KnownClients)
+ {
+ Cbo.BeginObject();
+ Cbo << "id" << ClientId;
+ if (Client.SessionId)
+ {
+ Cbo << "session_id" << Client.SessionId;
+ }
+ Cbo << "hostname" << std::string_view(Client.Hostname);
+ if (!Client.Address.empty())
+ {
+ Cbo << "address" << std::string_view(Client.Address);
+ }
+ Cbo << "dt" << Client.LastSeen.GetElapsedTimeMs();
+ if (Client.Metadata)
+ {
+ Cbo << "metadata" << Client.Metadata;
+ }
+ Cbo.EndObject();
+ }
+ });
+
+ Cbo.EndArray();
+ return Cbo.Save();
+}
+
+CbObject
+OrchestratorService::GetClientHistory(int Limit)
+{
+ ZEN_TRACE_CPU("OrchestratorService::GetClientHistory");
+
+ if (Limit <= 0)
+ {
+ Limit = 100;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("client_events");
+
+ m_ClientLogLock.WithSharedLock([&] {
+ int Count = 0;
+ for (auto It = m_ClientLog.rbegin(); It != m_ClientLog.rend() && Count < Limit; ++It, ++Count)
+ {
+ const auto& Evt = *It;
+ Cbo.BeginObject();
+
+ switch (Evt.EventType)
+ {
+ case ClientEvent::Type::Connected:
+ Cbo << "type"
+ << "connected";
+ break;
+ case ClientEvent::Type::Disconnected:
+ Cbo << "type"
+ << "disconnected";
+ break;
+ case ClientEvent::Type::Updated:
+ Cbo << "type"
+ << "updated";
+ break;
+ }
+
+ Cbo.AddDateTime("ts", Evt.Timestamp);
+ Cbo << "client_id" << std::string_view(Evt.ClientId);
+ Cbo << "hostname" << std::string_view(Evt.Hostname);
+ Cbo.EndObject();
+ }
+ });
+
+ Cbo.EndArray();
+ return Cbo.Save();
+}
+
+void
+OrchestratorService::RecordClientEvent(ClientEvent::Type Type, std::string_view ClientId, std::string_view Hostname)
+{
+ ClientEvent Evt{
+ .EventType = Type,
+ .Timestamp = DateTime::Now(),
+ .ClientId = std::string(ClientId),
+ .Hostname = std::string(Hostname),
+ };
+
+ m_ClientLogLock.WithExclusiveLock([&] {
+ m_ClientLog.push_back(std::move(Evt));
+ while (m_ClientLog.size() > kMaxClientEvents)
+ {
+ m_ClientLog.pop_front();
+ }
+ });
+}
+
+void
+OrchestratorService::ProbeThreadFunction()
+{
+ ZEN_TRACE_CPU("OrchestratorService::ProbeThreadFunction");
+ SetCurrentThreadName("orch_probe");
+
+ bool IsFirstProbe = true;
+
+ do
+ {
+ if (!IsFirstProbe)
+ {
+ m_ProbeThreadEvent.Wait(5'000);
+ m_ProbeThreadEvent.Reset();
+ }
+ else
+ {
+ IsFirstProbe = false;
+ }
+
+ if (m_ProbeThreadEnabled == false)
+ {
+ return;
+ }
+
+ m_ProbeThreadEvent.Reset();
+
+ // Snapshot worker IDs and URIs under shared lock
+ struct WorkerSnapshot
+ {
+ std::string Id;
+ std::string Uri;
+ bool WsConnected = false;
+ };
+ std::vector<WorkerSnapshot> Snapshots;
+
+ m_KnownWorkersLock.WithSharedLock([&] {
+ Snapshots.reserve(m_KnownWorkers.size());
+ for (const auto& [WorkerId, Worker] : m_KnownWorkers)
+ {
+ Snapshots.push_back({WorkerId, Worker.BaseUri, Worker.WsConnected});
+ }
+ });
+
+ // Probe each worker outside the lock
+ for (const auto& Snap : Snapshots)
+ {
+ if (m_ProbeThreadEnabled == false)
+ {
+ return;
+ }
+
+ // Workers with an active WebSocket connection are known-reachable;
+ // skip the HTTP health probe for them.
+ if (Snap.WsConnected)
+ {
+ continue;
+ }
+
+ ReachableState NewState = ReachableState::Unreachable;
+
+ try
+ {
+ HttpClient Client(Snap.Uri,
+ {.ConnectTimeout = std::chrono::milliseconds{3000}, .Timeout = std::chrono::milliseconds{5000}});
+ HttpClient::Response Response = Client.Get("/health/");
+ if (Response.IsSuccess())
+ {
+ NewState = ReachableState::Reachable;
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what());
+ }
+
+ ReachableState PrevState = ReachableState::Unknown;
+ std::string WorkerHostname;
+
+ m_KnownWorkersLock.WithExclusiveLock([&] {
+ auto It = m_KnownWorkers.find(Snap.Id);
+ if (It != m_KnownWorkers.end())
+ {
+ PrevState = It->second.Reachable;
+ WorkerHostname = It->second.Hostname;
+ It->second.Reachable = NewState;
+ It->second.LastProbed.Reset();
+
+ if (PrevState != NewState)
+ {
+ if (NewState == ReachableState::Reachable && PrevState == ReachableState::Unreachable)
+ {
+ ZEN_INFO("worker {} ({}) is reachable again", Snap.Id, Snap.Uri);
+ }
+ else if (NewState == ReachableState::Reachable)
+ {
+ ZEN_INFO("worker {} ({}) is now reachable", Snap.Id, Snap.Uri);
+ }
+ else if (PrevState == ReachableState::Reachable)
+ {
+ ZEN_WARN("worker {} ({}) is no longer reachable", Snap.Id, Snap.Uri);
+ }
+ else
+ {
+ ZEN_WARN("worker {} ({}) is not reachable", Snap.Id, Snap.Uri);
+ }
+ }
+ }
+ });
+
+ // Record provisioning events for state transitions outside the lock
+ if (PrevState != NewState)
+ {
+ if (NewState == ReachableState::Unreachable && PrevState == ReachableState::Reachable)
+ {
+ RecordProvisioningEvent(ProvisioningEvent::Type::Left, Snap.Id, WorkerHostname);
+ }
+ else if (NewState == ReachableState::Reachable && PrevState == ReachableState::Unreachable)
+ {
+ RecordProvisioningEvent(ProvisioningEvent::Type::Returned, Snap.Id, WorkerHostname);
+ }
+ }
+ }
+
+ // Sweep expired clients (5-minute timeout)
+ static constexpr int64_t kClientTimeoutMs = 5 * 60 * 1000;
+
+ struct ExpiredClient
+ {
+ std::string Id;
+ std::string Hostname;
+ };
+ std::vector<ExpiredClient> ExpiredClients;
+
+ m_KnownClientsLock.WithExclusiveLock([&] {
+ for (auto It = m_KnownClients.begin(); It != m_KnownClients.end();)
+ {
+ if (It->second.LastSeen.GetElapsedTimeMs() > kClientTimeoutMs)
+ {
+ ExpiredClients.push_back({It->first, It->second.Hostname});
+ It = m_KnownClients.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+
+ for (const auto& Expired : ExpiredClients)
+ {
+ ZEN_INFO("client {} timed out (no announcement for >5 minutes)", Expired.Id);
+ RecordClientEvent(ClientEvent::Type::Disconnected, Expired.Id, Expired.Hostname);
+ }
+ } while (m_ProbeThreadEnabled);
+}
+
+} // namespace zen::compute
+
+#endif