// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { struct ProcessMetricsTracker::Impl { static constexpr size_t kBatchSize = 8; struct Entry { ProcessHandle Handle; ProcessMetrics LastMetrics; float CpuUsagePercent = -1.0f; uint64_t PrevUserTimeMs = 0; uint64_t PrevKernelTimeMs = 0; uint64_t PrevSampleTicks = 0; }; uint64_t m_SampleIntervalMs; mutable RwLock m_Lock; std::unordered_map m_Entries; size_t m_NextSampleIndex = 0; std::vector m_KeyOrder; std::atomic m_Running{false}; // Thread-based sampling std::thread m_Thread; Event m_StopEvent; // Timer-based sampling std::unique_ptr m_Timer; explicit Impl(uint64_t SampleIntervalMs) : m_SampleIntervalMs(SampleIntervalMs) {} Impl(asio::io_context& IoContext, uint64_t SampleIntervalMs) : m_SampleIntervalMs(SampleIntervalMs) , m_Timer(std::make_unique(IoContext)) { } ~Impl() { Stop(); } void Start() { if (m_Running.exchange(true)) { return; } if (m_Timer) { EnqueueTimer(); } else { m_Thread = std::thread([this] { SamplingLoop(); }); } } void Stop() { if (!m_Running.exchange(false)) { return; } if (m_Timer) { m_Timer->cancel(); } if (m_Thread.joinable()) { m_StopEvent.Set(); m_Thread.join(); } } void Add(const ProcessHandle& Handle) { int Pid = Handle.Pid(); RwLock::ExclusiveLockScope $(m_Lock); auto It = m_Entries.find(Pid); if (It != m_Entries.end()) { m_Entries.erase(It); } else { m_KeyOrder.push_back(Pid); } auto [NewIt, Inserted] = m_Entries.try_emplace(Pid); NewIt->second.Handle.Initialize(Pid); } void Remove(int Pid) { RwLock::ExclusiveLockScope $(m_Lock); m_Entries.erase(Pid); m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end()); if (m_NextSampleIndex >= m_KeyOrder.size()) { m_NextSampleIndex = 0; } } void Clear() { RwLock::ExclusiveLockScope $(m_Lock); m_Entries.clear(); m_KeyOrder.clear(); m_NextSampleIndex = 0; } std::vector GetSnapshot() const { std::vector Result; RwLock::SharedLockScope $(m_Lock); Result.reserve(m_Entries.size()); for (const auto& [Pid, E] : m_Entries) { TrackedProcessEntry Out; Out.Pid = Pid; Out.Metrics = E.LastMetrics; Out.CpuUsagePercent = E.CpuUsagePercent; Result.push_back(std::move(Out)); } return Result; } AggregateProcessMetrics GetAggregate() const { AggregateProcessMetrics Agg; RwLock::SharedLockScope $(m_Lock); for (const auto& [Pid, E] : m_Entries) { Agg.TotalWorkingSetSize += E.LastMetrics.WorkingSetSize; Agg.TotalPeakWorkingSetSize += E.LastMetrics.PeakWorkingSetSize; Agg.TotalUserTimeMs += E.LastMetrics.UserTimeMs; Agg.TotalKernelTimeMs += E.LastMetrics.KernelTimeMs; Agg.ProcessCount++; } return Agg; } void SampleBatch() { RwLock::SharedLockScope $(m_Lock); if (m_KeyOrder.empty()) { return; } const uint64_t NowTicks = GetHifreqTimerValue(); size_t Remaining = std::min(kBatchSize, m_KeyOrder.size()); while (Remaining > 0) { if (m_NextSampleIndex >= m_KeyOrder.size()) { m_NextSampleIndex = 0; } int Pid = m_KeyOrder[m_NextSampleIndex]; auto It = m_Entries.find(Pid); if (It == m_Entries.end()) { m_NextSampleIndex++; Remaining--; continue; } Entry& E = It->second; ProcessMetrics Metrics; GetProcessMetrics(E.Handle, Metrics); if (E.PrevSampleTicks > 0) { uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - E.PrevSampleTicks); uint64_t DeltaCpuTimeMs = (Metrics.UserTimeMs + Metrics.KernelTimeMs) - (E.PrevUserTimeMs + E.PrevKernelTimeMs); if (ElapsedMs > 0) { E.CpuUsagePercent = static_cast(static_cast(DeltaCpuTimeMs) / ElapsedMs * 100.0); } } E.PrevUserTimeMs = Metrics.UserTimeMs; E.PrevKernelTimeMs = Metrics.KernelTimeMs; E.PrevSampleTicks = NowTicks; E.LastMetrics = Metrics; m_NextSampleIndex++; Remaining--; } } void SamplingLoop() { while (!m_StopEvent.Wait(static_cast(m_SampleIntervalMs))) { if (!m_Running.load()) { return; } SampleBatch(); } } void EnqueueTimer() { if (!m_Timer || !m_Running.load()) { return; } m_Timer->expires_after(std::chrono::milliseconds(m_SampleIntervalMs)); m_Timer->async_wait([this](const asio::error_code& Ec) { if (Ec || !m_Running.load()) { return; } SampleBatch(); EnqueueTimer(); }); } }; ////////////////////////////////////////////////////////////////////////// ProcessMetricsTracker::ProcessMetricsTracker(uint64_t SampleIntervalMs) : m_Impl(std::make_unique(SampleIntervalMs)) { } ProcessMetricsTracker::ProcessMetricsTracker(asio::io_context& IoContext, uint64_t SampleIntervalMs) : m_Impl(std::make_unique(IoContext, SampleIntervalMs)) { } ProcessMetricsTracker::~ProcessMetricsTracker() = default; void ProcessMetricsTracker::Start() { m_Impl->Start(); } void ProcessMetricsTracker::Stop() { m_Impl->Stop(); } void ProcessMetricsTracker::Add(const ProcessHandle& Handle) { m_Impl->Add(Handle); } void ProcessMetricsTracker::Remove(int Pid) { m_Impl->Remove(Pid); } void ProcessMetricsTracker::Clear() { m_Impl->Clear(); } std::vector ProcessMetricsTracker::GetSnapshot() const { return m_Impl->GetSnapshot(); } AggregateProcessMetrics ProcessMetricsTracker::GetAggregate() const { return m_Impl->GetAggregate(); } } // namespace zen #if ZEN_WITH_TESTS # include using namespace zen; void zen::processmetricstracker_forcelink() { } TEST_SUITE_BEGIN("util.processmetricstracker"); TEST_CASE("ProcessMetricsTracker.SelfProcess") { ProcessMetricsTracker Tracker(100); Tracker.Start(); ProcessHandle Handle; Handle.Initialize(zen::GetCurrentProcessId()); REQUIRE(Handle.IsValid()); int Pid = Handle.Pid(); Tracker.Add(Handle); // Wait for at least two samples so CPU% is computed std::this_thread::sleep_for(std::chrono::milliseconds(350)); auto Snapshot = Tracker.GetSnapshot(); REQUIRE(Snapshot.size() == 1); CHECK(Snapshot[0].Pid == Pid); CHECK(Snapshot[0].Metrics.WorkingSetSize > 0); CHECK(Snapshot[0].Metrics.MemoryBytes > 0); CHECK((Snapshot[0].Metrics.UserTimeMs + Snapshot[0].Metrics.KernelTimeMs) > 0); CHECK(Snapshot[0].CpuUsagePercent >= 0.0f); auto Agg = Tracker.GetAggregate(); CHECK(Agg.ProcessCount == 1); CHECK(Agg.TotalWorkingSetSize > 0); Tracker.Remove(Pid); Snapshot = Tracker.GetSnapshot(); CHECK(Snapshot.empty()); Tracker.Stop(); } TEST_CASE("ProcessMetricsTracker.AsioTimer") { asio::io_context IoContext; ProcessMetricsTracker Tracker(IoContext, 100); Tracker.Start(); ProcessHandle Handle; Handle.Initialize(zen::GetCurrentProcessId()); REQUIRE(Handle.IsValid()); Tracker.Add(Handle); // Run the io_context for enough time to get two samples IoContext.run_for(std::chrono::milliseconds(350)); auto Snapshot = Tracker.GetSnapshot(); REQUIRE(Snapshot.size() == 1); CHECK(Snapshot[0].Metrics.WorkingSetSize > 0); CHECK(Snapshot[0].CpuUsagePercent >= 0.0f); Tracker.Stop(); } TEST_SUITE_END(); #endif