aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/timeline/workertimeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/timeline/workertimeline.cpp')
-rw-r--r--src/zencompute/timeline/workertimeline.cpp430
1 files changed, 430 insertions, 0 deletions
diff --git a/src/zencompute/timeline/workertimeline.cpp b/src/zencompute/timeline/workertimeline.cpp
new file mode 100644
index 000000000..88ef5b62d
--- /dev/null
+++ b/src/zencompute/timeline/workertimeline.cpp
@@ -0,0 +1,430 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "workertimeline.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/basicfile.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinaryfile.h>
+
+# include <algorithm>
+
+namespace zen::compute {
+
+WorkerTimeline::WorkerTimeline(std::string_view WorkerId) : m_WorkerId(WorkerId)
+{
+}
+
+WorkerTimeline::~WorkerTimeline()
+{
+}
+
+void
+WorkerTimeline::RecordProvisioned()
+{
+ AppendEvent({
+ .Type = EventType::WorkerProvisioned,
+ .Timestamp = DateTime::Now(),
+ });
+}
+
+void
+WorkerTimeline::RecordDeprovisioned()
+{
+ AppendEvent({
+ .Type = EventType::WorkerDeprovisioned,
+ .Timestamp = DateTime::Now(),
+ });
+}
+
+void
+WorkerTimeline::RecordActionAccepted(int ActionLsn, const IoHash& ActionId)
+{
+ AppendEvent({
+ .Type = EventType::ActionAccepted,
+ .Timestamp = DateTime::Now(),
+ .ActionLsn = ActionLsn,
+ .ActionId = ActionId,
+ });
+}
+
+void
+WorkerTimeline::RecordActionRejected(int ActionLsn, const IoHash& ActionId, std::string_view Reason)
+{
+ AppendEvent({
+ .Type = EventType::ActionRejected,
+ .Timestamp = DateTime::Now(),
+ .ActionLsn = ActionLsn,
+ .ActionId = ActionId,
+ .Reason = std::string(Reason),
+ });
+}
+
+void
+WorkerTimeline::RecordActionStateChanged(int ActionLsn,
+ const IoHash& ActionId,
+ RunnerAction::State PreviousState,
+ RunnerAction::State NewState)
+{
+ AppendEvent({
+ .Type = EventType::ActionStateChanged,
+ .Timestamp = DateTime::Now(),
+ .ActionLsn = ActionLsn,
+ .ActionId = ActionId,
+ .ActionState = NewState,
+ .PreviousState = PreviousState,
+ });
+}
+
+std::vector<WorkerTimeline::Event>
+WorkerTimeline::QueryTimeline(DateTime StartTime, DateTime EndTime) const
+{
+ std::vector<Event> Result;
+
+ m_EventsLock.WithSharedLock([&] {
+ for (const auto& Evt : m_Events)
+ {
+ if (Evt.Timestamp >= StartTime && Evt.Timestamp <= EndTime)
+ {
+ Result.push_back(Evt);
+ }
+ }
+ });
+
+ return Result;
+}
+
+std::vector<WorkerTimeline::Event>
+WorkerTimeline::QueryRecent(int Limit) const
+{
+ std::vector<Event> Result;
+
+ m_EventsLock.WithSharedLock([&] {
+ const int Count = std::min(Limit, gsl::narrow<int>(m_Events.size()));
+ auto It = m_Events.end() - Count;
+ Result.assign(It, m_Events.end());
+ });
+
+ return Result;
+}
+
+size_t
+WorkerTimeline::GetEventCount() const
+{
+ size_t Count = 0;
+ m_EventsLock.WithSharedLock([&] { Count = m_Events.size(); });
+ return Count;
+}
+
+WorkerTimeline::TimeRange
+WorkerTimeline::GetTimeRange() const
+{
+ TimeRange Range;
+ m_EventsLock.WithSharedLock([&] {
+ if (!m_Events.empty())
+ {
+ Range.First = m_Events.front().Timestamp;
+ Range.Last = m_Events.back().Timestamp;
+ }
+ });
+ return Range;
+}
+
+void
+WorkerTimeline::AppendEvent(Event&& Evt)
+{
+ m_EventsLock.WithExclusiveLock([&] {
+ while (m_Events.size() >= m_MaxEvents)
+ {
+ m_Events.pop_front();
+ }
+
+ m_Events.push_back(std::move(Evt));
+ });
+}
+
+const char*
+WorkerTimeline::ToString(EventType Type)
+{
+ switch (Type)
+ {
+ case EventType::WorkerProvisioned:
+ return "provisioned";
+ case EventType::WorkerDeprovisioned:
+ return "deprovisioned";
+ case EventType::ActionAccepted:
+ return "accepted";
+ case EventType::ActionRejected:
+ return "rejected";
+ case EventType::ActionStateChanged:
+ return "state_changed";
+ default:
+ return "unknown";
+ }
+}
+
+static WorkerTimeline::EventType
+EventTypeFromString(std::string_view Str)
+{
+ if (Str == "provisioned")
+ return WorkerTimeline::EventType::WorkerProvisioned;
+ if (Str == "deprovisioned")
+ return WorkerTimeline::EventType::WorkerDeprovisioned;
+ if (Str == "accepted")
+ return WorkerTimeline::EventType::ActionAccepted;
+ if (Str == "rejected")
+ return WorkerTimeline::EventType::ActionRejected;
+ if (Str == "state_changed")
+ return WorkerTimeline::EventType::ActionStateChanged;
+ return WorkerTimeline::EventType::WorkerProvisioned;
+}
+
+void
+WorkerTimeline::WriteTo(const std::filesystem::path& Path) const
+{
+ CbObjectWriter Cbo;
+ Cbo << "worker_id" << m_WorkerId;
+
+ m_EventsLock.WithSharedLock([&] {
+ if (!m_Events.empty())
+ {
+ Cbo.AddDateTime("time_first", m_Events.front().Timestamp);
+ Cbo.AddDateTime("time_last", m_Events.back().Timestamp);
+ }
+
+ Cbo.BeginArray("events");
+ for (const auto& Evt : m_Events)
+ {
+ Cbo.BeginObject();
+ Cbo << "type" << ToString(Evt.Type);
+ Cbo.AddDateTime("ts", Evt.Timestamp);
+
+ if (Evt.ActionLsn != 0)
+ {
+ Cbo << "lsn" << Evt.ActionLsn;
+ Cbo << "action_id" << Evt.ActionId;
+ }
+
+ if (Evt.Type == EventType::ActionStateChanged)
+ {
+ Cbo << "prev_state" << static_cast<int32_t>(Evt.PreviousState);
+ Cbo << "state" << static_cast<int32_t>(Evt.ActionState);
+ }
+
+ if (!Evt.Reason.empty())
+ {
+ Cbo << "reason" << std::string_view(Evt.Reason);
+ }
+
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ });
+
+ CbObject Obj = Cbo.Save();
+
+ BasicFile File(Path, BasicFile::Mode::kTruncate);
+ File.Write(Obj.GetBuffer().GetView(), 0);
+}
+
+void
+WorkerTimeline::ReadFrom(const std::filesystem::path& Path)
+{
+ CbObjectFromFile Loaded = LoadCompactBinaryObject(Path);
+ CbObject Root = std::move(Loaded.Object);
+
+ if (!Root)
+ {
+ return;
+ }
+
+ std::deque<Event> LoadedEvents;
+
+ for (CbFieldView Field : Root["events"].AsArrayView())
+ {
+ CbObjectView EventObj = Field.AsObjectView();
+
+ Event Evt;
+ Evt.Type = EventTypeFromString(EventObj["type"].AsString());
+ Evt.Timestamp = EventObj["ts"].AsDateTime();
+
+ Evt.ActionLsn = EventObj["lsn"].AsInt32();
+ Evt.ActionId = EventObj["action_id"].AsHash();
+
+ if (Evt.Type == EventType::ActionStateChanged)
+ {
+ Evt.PreviousState = static_cast<RunnerAction::State>(EventObj["prev_state"].AsInt32());
+ Evt.ActionState = static_cast<RunnerAction::State>(EventObj["state"].AsInt32());
+ }
+
+ std::string_view Reason = EventObj["reason"].AsString();
+ if (!Reason.empty())
+ {
+ Evt.Reason = std::string(Reason);
+ }
+
+ LoadedEvents.push_back(std::move(Evt));
+ }
+
+ m_EventsLock.WithExclusiveLock([&] { m_Events = std::move(LoadedEvents); });
+}
+
+WorkerTimeline::TimeRange
+WorkerTimeline::ReadTimeRange(const std::filesystem::path& Path)
+{
+ CbObjectFromFile Loaded = LoadCompactBinaryObject(Path);
+
+ if (!Loaded.Object)
+ {
+ return {};
+ }
+
+ return {
+ .First = Loaded.Object["time_first"].AsDateTime(),
+ .Last = Loaded.Object["time_last"].AsDateTime(),
+ };
+}
+
+// WorkerTimelineStore
+
+static constexpr std::string_view kTimelineExtension = ".ztimeline";
+
+WorkerTimelineStore::WorkerTimelineStore(std::filesystem::path PersistenceDir) : m_PersistenceDir(std::move(PersistenceDir))
+{
+ std::error_code Ec;
+ std::filesystem::create_directories(m_PersistenceDir, Ec);
+}
+
+Ref<WorkerTimeline>
+WorkerTimelineStore::GetOrCreate(std::string_view WorkerId)
+{
+ // Fast path: check if it already exists in memory
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ auto It = m_Timelines.find(std::string(WorkerId));
+ if (It != m_Timelines.end())
+ {
+ return It->second;
+ }
+ }
+
+ // Slow path: create under exclusive lock, loading from disk if available
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto& Entry = m_Timelines[std::string(WorkerId)];
+ if (!Entry)
+ {
+ Entry = Ref<WorkerTimeline>(new WorkerTimeline(WorkerId));
+
+ std::filesystem::path Path = TimelinePath(WorkerId);
+ std::error_code Ec;
+ if (std::filesystem::is_regular_file(Path, Ec))
+ {
+ Entry->ReadFrom(Path);
+ }
+ }
+ return Entry;
+}
+
+Ref<WorkerTimeline>
+WorkerTimelineStore::Find(std::string_view WorkerId)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ auto It = m_Timelines.find(std::string(WorkerId));
+ if (It != m_Timelines.end())
+ {
+ return It->second;
+ }
+ return {};
+}
+
+std::vector<std::string>
+WorkerTimelineStore::GetActiveWorkerIds() const
+{
+ std::vector<std::string> Result;
+
+ RwLock::SharedLockScope $(m_Lock);
+ Result.reserve(m_Timelines.size());
+ for (const auto& [Id, _] : m_Timelines)
+ {
+ Result.push_back(Id);
+ }
+
+ return Result;
+}
+
+std::vector<WorkerTimelineStore::WorkerTimelineInfo>
+WorkerTimelineStore::GetAllWorkerInfo() const
+{
+ std::unordered_map<std::string, WorkerTimeline::TimeRange> InfoMap;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& [Id, Timeline] : m_Timelines)
+ {
+ InfoMap[Id] = Timeline->GetTimeRange();
+ }
+ }
+
+ std::error_code Ec;
+ for (const auto& Entry : std::filesystem::directory_iterator(m_PersistenceDir, Ec))
+ {
+ if (!Entry.is_regular_file())
+ {
+ continue;
+ }
+
+ const auto& Path = Entry.path();
+ if (Path.extension().string() != kTimelineExtension)
+ {
+ continue;
+ }
+
+ std::string Id = Path.stem().string();
+ if (InfoMap.find(Id) == InfoMap.end())
+ {
+ InfoMap[Id] = WorkerTimeline::ReadTimeRange(Path);
+ }
+ }
+
+ std::vector<WorkerTimelineInfo> Result;
+ Result.reserve(InfoMap.size());
+ for (auto& [Id, Range] : InfoMap)
+ {
+ Result.push_back({.WorkerId = std::move(Id), .Range = Range});
+ }
+ return Result;
+}
+
+void
+WorkerTimelineStore::Save(std::string_view WorkerId)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ auto It = m_Timelines.find(std::string(WorkerId));
+ if (It != m_Timelines.end())
+ {
+ It->second->WriteTo(TimelinePath(WorkerId));
+ }
+}
+
+void
+WorkerTimelineStore::SaveAll()
+{
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& [Id, Timeline] : m_Timelines)
+ {
+ Timeline->WriteTo(TimelinePath(Id));
+ }
+}
+
+std::filesystem::path
+WorkerTimelineStore::TimelinePath(std::string_view WorkerId) const
+{
+ return m_PersistenceDir / (std::string(WorkerId) + std::string(kTimelineExtension));
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES