aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/timeline
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
committerLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
commitd1abc50ee9d4fb72efc646e17decafea741caa34 (patch)
treee4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zencompute/timeline
parentAllow requests with invalid content-types unless specified in command line or... (diff)
parentupdated chunk–block analyser (#818) (diff)
downloadzen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz
zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zencompute/timeline')
-rw-r--r--src/zencompute/timeline/workertimeline.cpp430
-rw-r--r--src/zencompute/timeline/workertimeline.h169
2 files changed, 599 insertions, 0 deletions
diff --git a/src/zencompute/timeline/workertimeline.cpp b/src/zencompute/timeline/workertimeline.cpp
new file mode 100644
index 000000000..88ef5b62d
--- /dev/null
+++ b/src/zencompute/timeline/workertimeline.cpp
@@ -0,0 +1,430 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "workertimeline.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/basicfile.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinaryfile.h>
+
+# include <algorithm>
+
+namespace zen::compute {
+
+WorkerTimeline::WorkerTimeline(std::string_view WorkerId) : m_WorkerId(WorkerId)
+{
+}
+
+WorkerTimeline::~WorkerTimeline()
+{
+}
+
+void
+WorkerTimeline::RecordProvisioned()
+{
+ AppendEvent({
+ .Type = EventType::WorkerProvisioned,
+ .Timestamp = DateTime::Now(),
+ });
+}
+
+void
+WorkerTimeline::RecordDeprovisioned()
+{
+ AppendEvent({
+ .Type = EventType::WorkerDeprovisioned,
+ .Timestamp = DateTime::Now(),
+ });
+}
+
+void
+WorkerTimeline::RecordActionAccepted(int ActionLsn, const IoHash& ActionId)
+{
+ AppendEvent({
+ .Type = EventType::ActionAccepted,
+ .Timestamp = DateTime::Now(),
+ .ActionLsn = ActionLsn,
+ .ActionId = ActionId,
+ });
+}
+
+void
+WorkerTimeline::RecordActionRejected(int ActionLsn, const IoHash& ActionId, std::string_view Reason)
+{
+ AppendEvent({
+ .Type = EventType::ActionRejected,
+ .Timestamp = DateTime::Now(),
+ .ActionLsn = ActionLsn,
+ .ActionId = ActionId,
+ .Reason = std::string(Reason),
+ });
+}
+
+void
+WorkerTimeline::RecordActionStateChanged(int ActionLsn,
+ const IoHash& ActionId,
+ RunnerAction::State PreviousState,
+ RunnerAction::State NewState)
+{
+ AppendEvent({
+ .Type = EventType::ActionStateChanged,
+ .Timestamp = DateTime::Now(),
+ .ActionLsn = ActionLsn,
+ .ActionId = ActionId,
+ .ActionState = NewState,
+ .PreviousState = PreviousState,
+ });
+}
+
+std::vector<WorkerTimeline::Event>
+WorkerTimeline::QueryTimeline(DateTime StartTime, DateTime EndTime) const
+{
+ std::vector<Event> Result;
+
+ m_EventsLock.WithSharedLock([&] {
+ for (const auto& Evt : m_Events)
+ {
+ if (Evt.Timestamp >= StartTime && Evt.Timestamp <= EndTime)
+ {
+ Result.push_back(Evt);
+ }
+ }
+ });
+
+ return Result;
+}
+
+std::vector<WorkerTimeline::Event>
+WorkerTimeline::QueryRecent(int Limit) const
+{
+ std::vector<Event> Result;
+
+ m_EventsLock.WithSharedLock([&] {
+ const int Count = std::min(Limit, gsl::narrow<int>(m_Events.size()));
+ auto It = m_Events.end() - Count;
+ Result.assign(It, m_Events.end());
+ });
+
+ return Result;
+}
+
+size_t
+WorkerTimeline::GetEventCount() const
+{
+ size_t Count = 0;
+ m_EventsLock.WithSharedLock([&] { Count = m_Events.size(); });
+ return Count;
+}
+
+WorkerTimeline::TimeRange
+WorkerTimeline::GetTimeRange() const
+{
+ TimeRange Range;
+ m_EventsLock.WithSharedLock([&] {
+ if (!m_Events.empty())
+ {
+ Range.First = m_Events.front().Timestamp;
+ Range.Last = m_Events.back().Timestamp;
+ }
+ });
+ return Range;
+}
+
+void
+WorkerTimeline::AppendEvent(Event&& Evt)
+{
+ m_EventsLock.WithExclusiveLock([&] {
+ while (m_Events.size() >= m_MaxEvents)
+ {
+ m_Events.pop_front();
+ }
+
+ m_Events.push_back(std::move(Evt));
+ });
+}
+
+const char*
+WorkerTimeline::ToString(EventType Type)
+{
+ switch (Type)
+ {
+ case EventType::WorkerProvisioned:
+ return "provisioned";
+ case EventType::WorkerDeprovisioned:
+ return "deprovisioned";
+ case EventType::ActionAccepted:
+ return "accepted";
+ case EventType::ActionRejected:
+ return "rejected";
+ case EventType::ActionStateChanged:
+ return "state_changed";
+ default:
+ return "unknown";
+ }
+}
+
+static WorkerTimeline::EventType
+EventTypeFromString(std::string_view Str)
+{
+ if (Str == "provisioned")
+ return WorkerTimeline::EventType::WorkerProvisioned;
+ if (Str == "deprovisioned")
+ return WorkerTimeline::EventType::WorkerDeprovisioned;
+ if (Str == "accepted")
+ return WorkerTimeline::EventType::ActionAccepted;
+ if (Str == "rejected")
+ return WorkerTimeline::EventType::ActionRejected;
+ if (Str == "state_changed")
+ return WorkerTimeline::EventType::ActionStateChanged;
+ return WorkerTimeline::EventType::WorkerProvisioned;
+}
+
+void
+WorkerTimeline::WriteTo(const std::filesystem::path& Path) const
+{
+ CbObjectWriter Cbo;
+ Cbo << "worker_id" << m_WorkerId;
+
+ m_EventsLock.WithSharedLock([&] {
+ if (!m_Events.empty())
+ {
+ Cbo.AddDateTime("time_first", m_Events.front().Timestamp);
+ Cbo.AddDateTime("time_last", m_Events.back().Timestamp);
+ }
+
+ Cbo.BeginArray("events");
+ for (const auto& Evt : m_Events)
+ {
+ Cbo.BeginObject();
+ Cbo << "type" << ToString(Evt.Type);
+ Cbo.AddDateTime("ts", Evt.Timestamp);
+
+ if (Evt.ActionLsn != 0)
+ {
+ Cbo << "lsn" << Evt.ActionLsn;
+ Cbo << "action_id" << Evt.ActionId;
+ }
+
+ if (Evt.Type == EventType::ActionStateChanged)
+ {
+ Cbo << "prev_state" << static_cast<int32_t>(Evt.PreviousState);
+ Cbo << "state" << static_cast<int32_t>(Evt.ActionState);
+ }
+
+ if (!Evt.Reason.empty())
+ {
+ Cbo << "reason" << std::string_view(Evt.Reason);
+ }
+
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ });
+
+ CbObject Obj = Cbo.Save();
+
+ BasicFile File(Path, BasicFile::Mode::kTruncate);
+ File.Write(Obj.GetBuffer().GetView(), 0);
+}
+
+void
+WorkerTimeline::ReadFrom(const std::filesystem::path& Path)
+{
+ CbObjectFromFile Loaded = LoadCompactBinaryObject(Path);
+ CbObject Root = std::move(Loaded.Object);
+
+ if (!Root)
+ {
+ return;
+ }
+
+ std::deque<Event> LoadedEvents;
+
+ for (CbFieldView Field : Root["events"].AsArrayView())
+ {
+ CbObjectView EventObj = Field.AsObjectView();
+
+ Event Evt;
+ Evt.Type = EventTypeFromString(EventObj["type"].AsString());
+ Evt.Timestamp = EventObj["ts"].AsDateTime();
+
+ Evt.ActionLsn = EventObj["lsn"].AsInt32();
+ Evt.ActionId = EventObj["action_id"].AsHash();
+
+ if (Evt.Type == EventType::ActionStateChanged)
+ {
+ Evt.PreviousState = static_cast<RunnerAction::State>(EventObj["prev_state"].AsInt32());
+ Evt.ActionState = static_cast<RunnerAction::State>(EventObj["state"].AsInt32());
+ }
+
+ std::string_view Reason = EventObj["reason"].AsString();
+ if (!Reason.empty())
+ {
+ Evt.Reason = std::string(Reason);
+ }
+
+ LoadedEvents.push_back(std::move(Evt));
+ }
+
+ m_EventsLock.WithExclusiveLock([&] { m_Events = std::move(LoadedEvents); });
+}
+
+WorkerTimeline::TimeRange
+WorkerTimeline::ReadTimeRange(const std::filesystem::path& Path)
+{
+ CbObjectFromFile Loaded = LoadCompactBinaryObject(Path);
+
+ if (!Loaded.Object)
+ {
+ return {};
+ }
+
+ return {
+ .First = Loaded.Object["time_first"].AsDateTime(),
+ .Last = Loaded.Object["time_last"].AsDateTime(),
+ };
+}
+
+// WorkerTimelineStore
+
+static constexpr std::string_view kTimelineExtension = ".ztimeline";
+
+WorkerTimelineStore::WorkerTimelineStore(std::filesystem::path PersistenceDir) : m_PersistenceDir(std::move(PersistenceDir))
+{
+ std::error_code Ec;
+ std::filesystem::create_directories(m_PersistenceDir, Ec);
+}
+
+Ref<WorkerTimeline>
+WorkerTimelineStore::GetOrCreate(std::string_view WorkerId)
+{
+ // Fast path: check if it already exists in memory
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ auto It = m_Timelines.find(std::string(WorkerId));
+ if (It != m_Timelines.end())
+ {
+ return It->second;
+ }
+ }
+
+ // Slow path: create under exclusive lock, loading from disk if available
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto& Entry = m_Timelines[std::string(WorkerId)];
+ if (!Entry)
+ {
+ Entry = Ref<WorkerTimeline>(new WorkerTimeline(WorkerId));
+
+ std::filesystem::path Path = TimelinePath(WorkerId);
+ std::error_code Ec;
+ if (std::filesystem::is_regular_file(Path, Ec))
+ {
+ Entry->ReadFrom(Path);
+ }
+ }
+ return Entry;
+}
+
+Ref<WorkerTimeline>
+WorkerTimelineStore::Find(std::string_view WorkerId)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ auto It = m_Timelines.find(std::string(WorkerId));
+ if (It != m_Timelines.end())
+ {
+ return It->second;
+ }
+ return {};
+}
+
+std::vector<std::string>
+WorkerTimelineStore::GetActiveWorkerIds() const
+{
+ std::vector<std::string> Result;
+
+ RwLock::SharedLockScope $(m_Lock);
+ Result.reserve(m_Timelines.size());
+ for (const auto& [Id, _] : m_Timelines)
+ {
+ Result.push_back(Id);
+ }
+
+ return Result;
+}
+
+std::vector<WorkerTimelineStore::WorkerTimelineInfo>
+WorkerTimelineStore::GetAllWorkerInfo() const
+{
+ std::unordered_map<std::string, WorkerTimeline::TimeRange> InfoMap;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& [Id, Timeline] : m_Timelines)
+ {
+ InfoMap[Id] = Timeline->GetTimeRange();
+ }
+ }
+
+ std::error_code Ec;
+ for (const auto& Entry : std::filesystem::directory_iterator(m_PersistenceDir, Ec))
+ {
+ if (!Entry.is_regular_file())
+ {
+ continue;
+ }
+
+ const auto& Path = Entry.path();
+ if (Path.extension().string() != kTimelineExtension)
+ {
+ continue;
+ }
+
+ std::string Id = Path.stem().string();
+ if (InfoMap.find(Id) == InfoMap.end())
+ {
+ InfoMap[Id] = WorkerTimeline::ReadTimeRange(Path);
+ }
+ }
+
+ std::vector<WorkerTimelineInfo> Result;
+ Result.reserve(InfoMap.size());
+ for (auto& [Id, Range] : InfoMap)
+ {
+ Result.push_back({.WorkerId = std::move(Id), .Range = Range});
+ }
+ return Result;
+}
+
+void
+WorkerTimelineStore::Save(std::string_view WorkerId)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ auto It = m_Timelines.find(std::string(WorkerId));
+ if (It != m_Timelines.end())
+ {
+ It->second->WriteTo(TimelinePath(WorkerId));
+ }
+}
+
+void
+WorkerTimelineStore::SaveAll()
+{
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& [Id, Timeline] : m_Timelines)
+ {
+ Timeline->WriteTo(TimelinePath(Id));
+ }
+}
+
+std::filesystem::path
+WorkerTimelineStore::TimelinePath(std::string_view WorkerId) const
+{
+ return m_PersistenceDir / (std::string(WorkerId) + std::string(kTimelineExtension));
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/timeline/workertimeline.h b/src/zencompute/timeline/workertimeline.h
new file mode 100644
index 000000000..87e19bc28
--- /dev/null
+++ b/src/zencompute/timeline/workertimeline.h
@@ -0,0 +1,169 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../runners/functionrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenbase/refcount.h>
+# include <zencore/compactbinary.h>
+# include <zencore/iohash.h>
+# include <zencore/thread.h>
+# include <zencore/timer.h>
+
+# include <deque>
+# include <filesystem>
+# include <string>
+# include <string_view>
+# include <unordered_map>
+# include <vector>
+
+namespace zen::compute {
+
+struct RunnerAction;
+
+/** Worker activity timeline for tracking and visualizing worker activity over time.
+ *
+ * Records worker lifecycle events (provisioning/deprovisioning) and action lifecycle
+ * events (accept, reject, state changes) with timestamps, enabling time-range queries
+ * for dashboard visualization.
+ */
+class WorkerTimeline : public RefCounted
+{
+public:
+ explicit WorkerTimeline(std::string_view WorkerId);
+ ~WorkerTimeline() override;
+
+ struct TimeRange
+ {
+ DateTime First = DateTime(0);
+ DateTime Last = DateTime(0);
+
+ explicit operator bool() const { return First.GetTicks() != 0; }
+ };
+
+ enum class EventType
+ {
+ WorkerProvisioned,
+ WorkerDeprovisioned,
+ ActionAccepted,
+ ActionRejected,
+ ActionStateChanged
+ };
+
+ static const char* ToString(EventType Type);
+
+ struct Event
+ {
+ EventType Type;
+ DateTime Timestamp = DateTime(0);
+
+ // Action context (only set for action events)
+ int ActionLsn = 0;
+ IoHash ActionId;
+ RunnerAction::State ActionState = RunnerAction::State::New;
+ RunnerAction::State PreviousState = RunnerAction::State::New;
+
+ // Optional reason (e.g. rejection reason)
+ std::string Reason;
+ };
+
+ /** Record that this worker has been provisioned and is available for work. */
+ void RecordProvisioned();
+
+ /** Record that this worker has been deprovisioned and is no longer available. */
+ void RecordDeprovisioned();
+
+ /** Record that an action was accepted by this worker. */
+ void RecordActionAccepted(int ActionLsn, const IoHash& ActionId);
+
+ /** Record that an action was rejected by this worker. */
+ void RecordActionRejected(int ActionLsn, const IoHash& ActionId, std::string_view Reason);
+
+ /** Record an action state transition on this worker. */
+ void RecordActionStateChanged(int ActionLsn, const IoHash& ActionId, RunnerAction::State PreviousState, RunnerAction::State NewState);
+
+ /** Query events within a time range (inclusive). Returns events ordered by timestamp. */
+ [[nodiscard]] std::vector<Event> QueryTimeline(DateTime StartTime, DateTime EndTime) const;
+
+ /** Query the most recent N events. */
+ [[nodiscard]] std::vector<Event> QueryRecent(int Limit = 100) const;
+
+ /** Return the total number of recorded events. */
+ [[nodiscard]] size_t GetEventCount() const;
+
+ /** Return the time range covered by the events in this timeline. */
+ [[nodiscard]] TimeRange GetTimeRange() const;
+
+ [[nodiscard]] const std::string& GetWorkerId() const { return m_WorkerId; }
+
+ /** Write the timeline to a file at the given path. */
+ void WriteTo(const std::filesystem::path& Path) const;
+
+ /** Read the timeline from a file at the given path. Replaces current in-memory events. */
+ void ReadFrom(const std::filesystem::path& Path);
+
+ /** Read only the time range from a persisted timeline file, without loading events. */
+ [[nodiscard]] static TimeRange ReadTimeRange(const std::filesystem::path& Path);
+
+private:
+ void AppendEvent(Event&& Evt);
+
+ std::string m_WorkerId;
+ mutable RwLock m_EventsLock;
+ std::deque<Event> m_Events;
+ size_t m_MaxEvents = 10'000;
+};
+
+/** Manages a set of WorkerTimeline instances, keyed by worker ID.
+ *
+ * Provides thread-safe lookup and on-demand creation of timelines, backed by
+ * a persistence directory. Each timeline is stored as a separate file named
+ * {WorkerId}.ztimeline within the directory.
+ */
+class WorkerTimelineStore
+{
+public:
+ explicit WorkerTimelineStore(std::filesystem::path PersistenceDir);
+ ~WorkerTimelineStore() = default;
+
+ WorkerTimelineStore(const WorkerTimelineStore&) = delete;
+ WorkerTimelineStore& operator=(const WorkerTimelineStore&) = delete;
+
+ /** Get the timeline for a worker, creating one if it does not exist.
+ * If a persisted file exists on disk it will be loaded on first access. */
+ Ref<WorkerTimeline> GetOrCreate(std::string_view WorkerId);
+
+ /** Get the timeline for a worker, or null ref if it does not exist in memory. */
+ [[nodiscard]] Ref<WorkerTimeline> Find(std::string_view WorkerId);
+
+ /** Return the worker IDs of currently loaded (in-memory) timelines. */
+ [[nodiscard]] std::vector<std::string> GetActiveWorkerIds() const;
+
+ struct WorkerTimelineInfo
+ {
+ std::string WorkerId;
+ WorkerTimeline::TimeRange Range;
+ };
+
+ /** Return info for all known timelines (in-memory and on-disk), including time range. */
+ [[nodiscard]] std::vector<WorkerTimelineInfo> GetAllWorkerInfo() const;
+
+ /** Persist a single worker's timeline to disk. */
+ void Save(std::string_view WorkerId);
+
+ /** Persist all in-memory timelines to disk. */
+ void SaveAll();
+
+private:
+ [[nodiscard]] std::filesystem::path TimelinePath(std::string_view WorkerId) const;
+
+ std::filesystem::path m_PersistenceDir;
+ mutable RwLock m_Lock;
+ std::unordered_map<std::string, Ref<WorkerTimeline>> m_Timelines;
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES