aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/timeline/workertimeline.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/timeline/workertimeline.h')
-rw-r--r--src/zencompute/timeline/workertimeline.h169
1 files changed, 169 insertions, 0 deletions
diff --git a/src/zencompute/timeline/workertimeline.h b/src/zencompute/timeline/workertimeline.h
new file mode 100644
index 000000000..87e19bc28
--- /dev/null
+++ b/src/zencompute/timeline/workertimeline.h
@@ -0,0 +1,169 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../runners/functionrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenbase/refcount.h>
+# include <zencore/compactbinary.h>
+# include <zencore/iohash.h>
+# include <zencore/thread.h>
+# include <zencore/timer.h>
+
+# include <deque>
+# include <filesystem>
+# include <string>
+# include <string_view>
+# include <unordered_map>
+# include <vector>
+
+namespace zen::compute {
+
+struct RunnerAction;
+
+/** Worker activity timeline for tracking and visualizing worker activity over time.
+ *
+ * Records worker lifecycle events (provisioning/deprovisioning) and action lifecycle
+ * events (accept, reject, state changes) with timestamps, enabling time-range queries
+ * for dashboard visualization.
+ */
+class WorkerTimeline : public RefCounted
+{
+public:
+ explicit WorkerTimeline(std::string_view WorkerId);
+ ~WorkerTimeline() override;
+
+ struct TimeRange
+ {
+ DateTime First = DateTime(0);
+ DateTime Last = DateTime(0);
+
+ explicit operator bool() const { return First.GetTicks() != 0; }
+ };
+
+ enum class EventType
+ {
+ WorkerProvisioned,
+ WorkerDeprovisioned,
+ ActionAccepted,
+ ActionRejected,
+ ActionStateChanged
+ };
+
+ static const char* ToString(EventType Type);
+
+ struct Event
+ {
+ EventType Type;
+ DateTime Timestamp = DateTime(0);
+
+ // Action context (only set for action events)
+ int ActionLsn = 0;
+ IoHash ActionId;
+ RunnerAction::State ActionState = RunnerAction::State::New;
+ RunnerAction::State PreviousState = RunnerAction::State::New;
+
+ // Optional reason (e.g. rejection reason)
+ std::string Reason;
+ };
+
+ /** Record that this worker has been provisioned and is available for work. */
+ void RecordProvisioned();
+
+ /** Record that this worker has been deprovisioned and is no longer available. */
+ void RecordDeprovisioned();
+
+ /** Record that an action was accepted by this worker. */
+ void RecordActionAccepted(int ActionLsn, const IoHash& ActionId);
+
+ /** Record that an action was rejected by this worker. */
+ void RecordActionRejected(int ActionLsn, const IoHash& ActionId, std::string_view Reason);
+
+ /** Record an action state transition on this worker. */
+ void RecordActionStateChanged(int ActionLsn, const IoHash& ActionId, RunnerAction::State PreviousState, RunnerAction::State NewState);
+
+ /** Query events within a time range (inclusive). Returns events ordered by timestamp. */
+ [[nodiscard]] std::vector<Event> QueryTimeline(DateTime StartTime, DateTime EndTime) const;
+
+ /** Query the most recent N events. */
+ [[nodiscard]] std::vector<Event> QueryRecent(int Limit = 100) const;
+
+ /** Return the total number of recorded events. */
+ [[nodiscard]] size_t GetEventCount() const;
+
+ /** Return the time range covered by the events in this timeline. */
+ [[nodiscard]] TimeRange GetTimeRange() const;
+
+ [[nodiscard]] const std::string& GetWorkerId() const { return m_WorkerId; }
+
+ /** Write the timeline to a file at the given path. */
+ void WriteTo(const std::filesystem::path& Path) const;
+
+ /** Read the timeline from a file at the given path. Replaces current in-memory events. */
+ void ReadFrom(const std::filesystem::path& Path);
+
+ /** Read only the time range from a persisted timeline file, without loading events. */
+ [[nodiscard]] static TimeRange ReadTimeRange(const std::filesystem::path& Path);
+
+private:
+ void AppendEvent(Event&& Evt);
+
+ std::string m_WorkerId;
+ mutable RwLock m_EventsLock;
+ std::deque<Event> m_Events;
+ size_t m_MaxEvents = 10'000;
+};
+
+/** Manages a set of WorkerTimeline instances, keyed by worker ID.
+ *
+ * Provides thread-safe lookup and on-demand creation of timelines, backed by
+ * a persistence directory. Each timeline is stored as a separate file named
+ * {WorkerId}.ztimeline within the directory.
+ */
+class WorkerTimelineStore
+{
+public:
+ explicit WorkerTimelineStore(std::filesystem::path PersistenceDir);
+ ~WorkerTimelineStore() = default;
+
+ WorkerTimelineStore(const WorkerTimelineStore&) = delete;
+ WorkerTimelineStore& operator=(const WorkerTimelineStore&) = delete;
+
+ /** Get the timeline for a worker, creating one if it does not exist.
+ * If a persisted file exists on disk it will be loaded on first access. */
+ Ref<WorkerTimeline> GetOrCreate(std::string_view WorkerId);
+
+ /** Get the timeline for a worker, or null ref if it does not exist in memory. */
+ [[nodiscard]] Ref<WorkerTimeline> Find(std::string_view WorkerId);
+
+ /** Return the worker IDs of currently loaded (in-memory) timelines. */
+ [[nodiscard]] std::vector<std::string> GetActiveWorkerIds() const;
+
+ struct WorkerTimelineInfo
+ {
+ std::string WorkerId;
+ WorkerTimeline::TimeRange Range;
+ };
+
+ /** Return info for all known timelines (in-memory and on-disk), including time range. */
+ [[nodiscard]] std::vector<WorkerTimelineInfo> GetAllWorkerInfo() const;
+
+ /** Persist a single worker's timeline to disk. */
+ void Save(std::string_view WorkerId);
+
+ /** Persist all in-memory timelines to disk. */
+ void SaveAll();
+
+private:
+ [[nodiscard]] std::filesystem::path TimelinePath(std::string_view WorkerId) const;
+
+ std::filesystem::path m_PersistenceDir;
+ mutable RwLock m_Lock;
+ std::unordered_map<std::string, Ref<WorkerTimeline>> m_Timelines;
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES