// Copyright Epic Games, Inc. All Rights Reserved. #include "workertimeline.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include namespace zen::compute { WorkerTimeline::WorkerTimeline(std::string_view WorkerId) : m_WorkerId(WorkerId) { } WorkerTimeline::~WorkerTimeline() { } void WorkerTimeline::RecordProvisioned() { AppendEvent({ .Type = EventType::WorkerProvisioned, .Timestamp = DateTime::Now(), }); } void WorkerTimeline::RecordDeprovisioned() { AppendEvent({ .Type = EventType::WorkerDeprovisioned, .Timestamp = DateTime::Now(), }); } void WorkerTimeline::RecordActionAccepted(int ActionLsn, const IoHash& ActionId) { AppendEvent({ .Type = EventType::ActionAccepted, .Timestamp = DateTime::Now(), .ActionLsn = ActionLsn, .ActionId = ActionId, }); } void WorkerTimeline::RecordActionRejected(int ActionLsn, const IoHash& ActionId, std::string_view Reason) { AppendEvent({ .Type = EventType::ActionRejected, .Timestamp = DateTime::Now(), .ActionLsn = ActionLsn, .ActionId = ActionId, .Reason = std::string(Reason), }); } void WorkerTimeline::RecordActionStateChanged(int ActionLsn, const IoHash& ActionId, RunnerAction::State PreviousState, RunnerAction::State NewState) { AppendEvent({ .Type = EventType::ActionStateChanged, .Timestamp = DateTime::Now(), .ActionLsn = ActionLsn, .ActionId = ActionId, .ActionState = NewState, .PreviousState = PreviousState, }); } std::vector WorkerTimeline::QueryTimeline(DateTime StartTime, DateTime EndTime) const { std::vector Result; m_EventsLock.WithSharedLock([&] { for (const auto& Evt : m_Events) { if (Evt.Timestamp >= StartTime && Evt.Timestamp <= EndTime) { Result.push_back(Evt); } } }); return Result; } std::vector WorkerTimeline::QueryRecent(int Limit) const { std::vector Result; m_EventsLock.WithSharedLock([&] { const int Count = std::min(Limit, gsl::narrow(m_Events.size())); auto It = m_Events.end() - Count; Result.assign(It, m_Events.end()); }); return Result; } size_t WorkerTimeline::GetEventCount() const { size_t Count = 0; m_EventsLock.WithSharedLock([&] { Count = m_Events.size(); }); return Count; } WorkerTimeline::TimeRange WorkerTimeline::GetTimeRange() const { TimeRange Range; m_EventsLock.WithSharedLock([&] { if (!m_Events.empty()) { Range.First = m_Events.front().Timestamp; Range.Last = m_Events.back().Timestamp; } }); return Range; } void WorkerTimeline::AppendEvent(Event&& Evt) { m_EventsLock.WithExclusiveLock([&] { while (m_Events.size() >= m_MaxEvents) { m_Events.pop_front(); } m_Events.push_back(std::move(Evt)); }); } const char* WorkerTimeline::ToString(EventType Type) { switch (Type) { case EventType::WorkerProvisioned: return "provisioned"; case EventType::WorkerDeprovisioned: return "deprovisioned"; case EventType::ActionAccepted: return "accepted"; case EventType::ActionRejected: return "rejected"; case EventType::ActionStateChanged: return "state_changed"; default: return "unknown"; } } static WorkerTimeline::EventType EventTypeFromString(std::string_view Str) { if (Str == "provisioned") return WorkerTimeline::EventType::WorkerProvisioned; if (Str == "deprovisioned") return WorkerTimeline::EventType::WorkerDeprovisioned; if (Str == "accepted") return WorkerTimeline::EventType::ActionAccepted; if (Str == "rejected") return WorkerTimeline::EventType::ActionRejected; if (Str == "state_changed") return WorkerTimeline::EventType::ActionStateChanged; return WorkerTimeline::EventType::WorkerProvisioned; } void WorkerTimeline::WriteTo(const std::filesystem::path& Path) const { CbObjectWriter Cbo; Cbo << "worker_id" << m_WorkerId; m_EventsLock.WithSharedLock([&] { if (!m_Events.empty()) { Cbo.AddDateTime("time_first", m_Events.front().Timestamp); Cbo.AddDateTime("time_last", m_Events.back().Timestamp); } Cbo.BeginArray("events"); for (const auto& Evt : m_Events) { Cbo.BeginObject(); Cbo << "type" << ToString(Evt.Type); Cbo.AddDateTime("ts", Evt.Timestamp); if (Evt.ActionLsn != 0) { Cbo << "lsn" << Evt.ActionLsn; Cbo << "action_id" << Evt.ActionId; } if (Evt.Type == EventType::ActionStateChanged) { Cbo << "prev_state" << static_cast(Evt.PreviousState); Cbo << "state" << static_cast(Evt.ActionState); } if (!Evt.Reason.empty()) { Cbo << "reason" << std::string_view(Evt.Reason); } Cbo.EndObject(); } Cbo.EndArray(); }); CbObject Obj = Cbo.Save(); BasicFile File(Path, BasicFile::Mode::kTruncate); File.Write(Obj.GetBuffer().GetView(), 0); } void WorkerTimeline::ReadFrom(const std::filesystem::path& Path) { CbObjectFromFile Loaded = LoadCompactBinaryObject(Path); CbObject Root = std::move(Loaded.Object); if (!Root) { return; } std::deque LoadedEvents; for (CbFieldView Field : Root["events"].AsArrayView()) { CbObjectView EventObj = Field.AsObjectView(); Event Evt; Evt.Type = EventTypeFromString(EventObj["type"].AsString()); Evt.Timestamp = EventObj["ts"].AsDateTime(); Evt.ActionLsn = EventObj["lsn"].AsInt32(); Evt.ActionId = EventObj["action_id"].AsHash(); if (Evt.Type == EventType::ActionStateChanged) { Evt.PreviousState = static_cast(EventObj["prev_state"].AsInt32()); Evt.ActionState = static_cast(EventObj["state"].AsInt32()); } std::string_view Reason = EventObj["reason"].AsString(); if (!Reason.empty()) { Evt.Reason = std::string(Reason); } LoadedEvents.push_back(std::move(Evt)); } m_EventsLock.WithExclusiveLock([&] { m_Events = std::move(LoadedEvents); }); } WorkerTimeline::TimeRange WorkerTimeline::ReadTimeRange(const std::filesystem::path& Path) { CbObjectFromFile Loaded = LoadCompactBinaryObject(Path); if (!Loaded.Object) { return {}; } return { .First = Loaded.Object["time_first"].AsDateTime(), .Last = Loaded.Object["time_last"].AsDateTime(), }; } // WorkerTimelineStore static constexpr std::string_view kTimelineExtension = ".ztimeline"; WorkerTimelineStore::WorkerTimelineStore(std::filesystem::path PersistenceDir) : m_PersistenceDir(std::move(PersistenceDir)) { std::error_code Ec; std::filesystem::create_directories(m_PersistenceDir, Ec); } Ref WorkerTimelineStore::GetOrCreate(std::string_view WorkerId) { // Fast path: check if it already exists in memory { RwLock::SharedLockScope _(m_Lock); auto It = m_Timelines.find(std::string(WorkerId)); if (It != m_Timelines.end()) { return It->second; } } // Slow path: create under exclusive lock, loading from disk if available RwLock::ExclusiveLockScope _(m_Lock); auto& Entry = m_Timelines[std::string(WorkerId)]; if (!Entry) { Entry = Ref(new WorkerTimeline(WorkerId)); std::filesystem::path Path = TimelinePath(WorkerId); std::error_code Ec; if (std::filesystem::is_regular_file(Path, Ec)) { Entry->ReadFrom(Path); } } return Entry; } Ref WorkerTimelineStore::Find(std::string_view WorkerId) { RwLock::SharedLockScope _(m_Lock); auto It = m_Timelines.find(std::string(WorkerId)); if (It != m_Timelines.end()) { return It->second; } return {}; } std::vector WorkerTimelineStore::GetActiveWorkerIds() const { std::vector Result; RwLock::SharedLockScope $(m_Lock); Result.reserve(m_Timelines.size()); for (const auto& [Id, _] : m_Timelines) { Result.push_back(Id); } return Result; } std::vector WorkerTimelineStore::GetAllWorkerInfo() const { std::unordered_map InfoMap; { RwLock::SharedLockScope _(m_Lock); for (const auto& [Id, Timeline] : m_Timelines) { InfoMap[Id] = Timeline->GetTimeRange(); } } std::error_code Ec; for (const auto& Entry : std::filesystem::directory_iterator(m_PersistenceDir, Ec)) { if (!Entry.is_regular_file()) { continue; } const auto& Path = Entry.path(); if (Path.extension().string() != kTimelineExtension) { continue; } std::string Id = Path.stem().string(); if (InfoMap.find(Id) == InfoMap.end()) { InfoMap[Id] = WorkerTimeline::ReadTimeRange(Path); } } std::vector Result; Result.reserve(InfoMap.size()); for (auto& [Id, Range] : InfoMap) { Result.push_back({.WorkerId = std::move(Id), .Range = Range}); } return Result; } void WorkerTimelineStore::Save(std::string_view WorkerId) { RwLock::SharedLockScope _(m_Lock); auto It = m_Timelines.find(std::string(WorkerId)); if (It != m_Timelines.end()) { It->second->WriteTo(TimelinePath(WorkerId)); } } void WorkerTimelineStore::SaveAll() { RwLock::SharedLockScope _(m_Lock); for (const auto& [Id, Timeline] : m_Timelines) { Timeline->WriteTo(TimelinePath(Id)); } } std::filesystem::path WorkerTimelineStore::TimelinePath(std::string_view WorkerId) const { return m_PersistenceDir / (std::string(WorkerId) + std::string(kTimelineExtension)); } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES