1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "../runners/functionrunner.h"
#if ZEN_WITH_COMPUTE_SERVICES
# include <zenbase/refcount.h>
# include <zencore/compactbinary.h>
# include <zencore/iohash.h>
# include <zencore/thread.h>
# include <zencore/timer.h>
# include <deque>
# include <filesystem>
# include <string>
# include <string_view>
# include <unordered_map>
# include <vector>
namespace zen::compute {
struct RunnerAction;
/** Worker activity timeline for tracking and visualizing worker activity over time.
*
* Records worker lifecycle events (provisioning/deprovisioning) and action lifecycle
* events (accept, reject, state changes) with timestamps, enabling time-range queries
* for dashboard visualization.
*/
class WorkerTimeline : public RefCounted
{
public:
explicit WorkerTimeline(std::string_view WorkerId);
~WorkerTimeline() override;
struct TimeRange
{
DateTime First = DateTime(0);
DateTime Last = DateTime(0);
explicit operator bool() const { return First.GetTicks() != 0; }
};
enum class EventType
{
WorkerProvisioned,
WorkerDeprovisioned,
ActionAccepted,
ActionRejected,
ActionStateChanged
};
static const char* ToString(EventType Type);
struct Event
{
EventType Type;
DateTime Timestamp = DateTime(0);
// Action context (only set for action events)
int ActionLsn = 0;
IoHash ActionId;
RunnerAction::State ActionState = RunnerAction::State::New;
RunnerAction::State PreviousState = RunnerAction::State::New;
// Optional reason (e.g. rejection reason)
std::string Reason;
};
/** Record that this worker has been provisioned and is available for work. */
void RecordProvisioned();
/** Record that this worker has been deprovisioned and is no longer available. */
void RecordDeprovisioned();
/** Record that an action was accepted by this worker. */
void RecordActionAccepted(int ActionLsn, const IoHash& ActionId);
/** Record that an action was rejected by this worker. */
void RecordActionRejected(int ActionLsn, const IoHash& ActionId, std::string_view Reason);
/** Record an action state transition on this worker. */
void RecordActionStateChanged(int ActionLsn, const IoHash& ActionId, RunnerAction::State PreviousState, RunnerAction::State NewState);
/** Query events within a time range (inclusive). Returns events ordered by timestamp. */
[[nodiscard]] std::vector<Event> QueryTimeline(DateTime StartTime, DateTime EndTime) const;
/** Query the most recent N events. */
[[nodiscard]] std::vector<Event> QueryRecent(int Limit = 100) const;
/** Return the total number of recorded events. */
[[nodiscard]] size_t GetEventCount() const;
/** Return the time range covered by the events in this timeline. */
[[nodiscard]] TimeRange GetTimeRange() const;
[[nodiscard]] const std::string& GetWorkerId() const { return m_WorkerId; }
/** Write the timeline to a file at the given path. */
void WriteTo(const std::filesystem::path& Path) const;
/** Read the timeline from a file at the given path. Replaces current in-memory events. */
void ReadFrom(const std::filesystem::path& Path);
/** Read only the time range from a persisted timeline file, without loading events. */
[[nodiscard]] static TimeRange ReadTimeRange(const std::filesystem::path& Path);
private:
void AppendEvent(Event&& Evt);
std::string m_WorkerId;
mutable RwLock m_EventsLock;
std::deque<Event> m_Events;
size_t m_MaxEvents = 10'000;
};
/** Manages a set of WorkerTimeline instances, keyed by worker ID.
*
* Provides thread-safe lookup and on-demand creation of timelines, backed by
* a persistence directory. Each timeline is stored as a separate file named
* {WorkerId}.ztimeline within the directory.
*/
class WorkerTimelineStore
{
public:
explicit WorkerTimelineStore(std::filesystem::path PersistenceDir);
~WorkerTimelineStore() = default;
WorkerTimelineStore(const WorkerTimelineStore&) = delete;
WorkerTimelineStore& operator=(const WorkerTimelineStore&) = delete;
/** Get the timeline for a worker, creating one if it does not exist.
* If a persisted file exists on disk it will be loaded on first access. */
Ref<WorkerTimeline> GetOrCreate(std::string_view WorkerId);
/** Get the timeline for a worker, or null ref if it does not exist in memory. */
[[nodiscard]] Ref<WorkerTimeline> Find(std::string_view WorkerId);
/** Return the worker IDs of currently loaded (in-memory) timelines. */
[[nodiscard]] std::vector<std::string> GetActiveWorkerIds() const;
struct WorkerTimelineInfo
{
std::string WorkerId;
WorkerTimeline::TimeRange Range;
};
/** Return info for all known timelines (in-memory and on-disk), including time range. */
[[nodiscard]] std::vector<WorkerTimelineInfo> GetAllWorkerInfo() const;
/** Persist a single worker's timeline to disk. */
void Save(std::string_view WorkerId);
/** Persist all in-memory timelines to disk. */
void SaveAll();
private:
[[nodiscard]] std::filesystem::path TimelinePath(std::string_view WorkerId) const;
std::filesystem::path m_PersistenceDir;
mutable RwLock m_Lock;
std::unordered_map<std::string, Ref<WorkerTimeline>> m_Timelines;
};
} // namespace zen::compute
#endif // ZEN_WITH_COMPUTE_SERVICES
|