// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "../runners/functionrunner.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include namespace zen::compute { struct RunnerAction; /** Worker activity timeline for tracking and visualizing worker activity over time. * * Records worker lifecycle events (provisioning/deprovisioning) and action lifecycle * events (accept, reject, state changes) with timestamps, enabling time-range queries * for dashboard visualization. */ class WorkerTimeline : public RefCounted { public: explicit WorkerTimeline(std::string_view WorkerId); ~WorkerTimeline() override; struct TimeRange { DateTime First = DateTime(0); DateTime Last = DateTime(0); explicit operator bool() const { return First.GetTicks() != 0; } }; enum class EventType { WorkerProvisioned, WorkerDeprovisioned, ActionAccepted, ActionRejected, ActionStateChanged }; static const char* ToString(EventType Type); struct Event { EventType Type; DateTime Timestamp = DateTime(0); // Action context (only set for action events) int ActionLsn = 0; IoHash ActionId; RunnerAction::State ActionState = RunnerAction::State::New; RunnerAction::State PreviousState = RunnerAction::State::New; // Optional reason (e.g. rejection reason) std::string Reason; }; /** Record that this worker has been provisioned and is available for work. */ void RecordProvisioned(); /** Record that this worker has been deprovisioned and is no longer available. */ void RecordDeprovisioned(); /** Record that an action was accepted by this worker. */ void RecordActionAccepted(int ActionLsn, const IoHash& ActionId); /** Record that an action was rejected by this worker. */ void RecordActionRejected(int ActionLsn, const IoHash& ActionId, std::string_view Reason); /** Record an action state transition on this worker. */ void RecordActionStateChanged(int ActionLsn, const IoHash& ActionId, RunnerAction::State PreviousState, RunnerAction::State NewState); /** Query events within a time range (inclusive). Returns events ordered by timestamp. */ [[nodiscard]] std::vector QueryTimeline(DateTime StartTime, DateTime EndTime) const; /** Query the most recent N events. */ [[nodiscard]] std::vector QueryRecent(int Limit = 100) const; /** Return the total number of recorded events. */ [[nodiscard]] size_t GetEventCount() const; /** Return the time range covered by the events in this timeline. */ [[nodiscard]] TimeRange GetTimeRange() const; [[nodiscard]] const std::string& GetWorkerId() const { return m_WorkerId; } /** Write the timeline to a file at the given path. */ void WriteTo(const std::filesystem::path& Path) const; /** Read the timeline from a file at the given path. Replaces current in-memory events. */ void ReadFrom(const std::filesystem::path& Path); /** Read only the time range from a persisted timeline file, without loading events. */ [[nodiscard]] static TimeRange ReadTimeRange(const std::filesystem::path& Path); private: void AppendEvent(Event&& Evt); std::string m_WorkerId; mutable RwLock m_EventsLock; std::deque m_Events; size_t m_MaxEvents = 10'000; }; /** Manages a set of WorkerTimeline instances, keyed by worker ID. * * Provides thread-safe lookup and on-demand creation of timelines, backed by * a persistence directory. Each timeline is stored as a separate file named * {WorkerId}.ztimeline within the directory. */ class WorkerTimelineStore { public: explicit WorkerTimelineStore(std::filesystem::path PersistenceDir); ~WorkerTimelineStore() = default; WorkerTimelineStore(const WorkerTimelineStore&) = delete; WorkerTimelineStore& operator=(const WorkerTimelineStore&) = delete; /** Get the timeline for a worker, creating one if it does not exist. * If a persisted file exists on disk it will be loaded on first access. */ Ref GetOrCreate(std::string_view WorkerId); /** Get the timeline for a worker, or null ref if it does not exist in memory. */ [[nodiscard]] Ref Find(std::string_view WorkerId); /** Return the worker IDs of currently loaded (in-memory) timelines. */ [[nodiscard]] std::vector GetActiveWorkerIds() const; struct WorkerTimelineInfo { std::string WorkerId; WorkerTimeline::TimeRange Range; }; /** Return info for all known timelines (in-memory and on-disk), including time range. */ [[nodiscard]] std::vector GetAllWorkerInfo() const; /** Persist a single worker's timeline to disk. */ void Save(std::string_view WorkerId); /** Persist all in-memory timelines to disk. */ void SaveAll(); private: [[nodiscard]] std::filesystem::path TimelinePath(std::string_view WorkerId) const; std::filesystem::path m_PersistenceDir; mutable RwLock m_Lock; std::unordered_map> m_Timelines; }; } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES