// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "zencompute/computeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "functionrunner.h" # include # include # include # include # include # include "deferreddeleter.h" # include # include # include # include # include namespace zen { class CbPackage; } namespace zen::compute { /** Direct process spawner This runner simply sets up a directory structure for each job and creates a process to perform the computation in it. It is not very efficient and is intended mostly for testing. */ class LocalProcessRunner : public FunctionRunner { LocalProcessRunner(LocalProcessRunner&&) = delete; LocalProcessRunner& operator=(LocalProcessRunner&&) = delete; public: LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir, DeferredDirectoryDeleter& Deleter, WorkerThreadPool& WorkerPool, int32_t MaxConcurrentActions = 0); ~LocalProcessRunner(); virtual void Shutdown() override; [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override; [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) override; [[nodiscard]] virtual bool IsHealthy() override { return true; } [[nodiscard]] virtual size_t GetSubmittedActionCount() override; [[nodiscard]] virtual size_t QueryCapacity() override; [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions) override; protected: LoggerRef Log() { return m_Log; } LoggerRef m_Log; struct RunningAction : public RefCounted { Ref Action; void* ProcessHandle = nullptr; int ExitCode = 0; std::filesystem::path SandboxPath; // State for periodic CPU usage sampling uint64_t LastCpuSampleTicks = 0; // hifreq timer value at last sample uint64_t LastCpuOsTicks = 0; // OS CPU ticks (platform-specific units) at last sample }; std::atomic_bool m_AcceptNewActions; ChunkResolver& m_ChunkResolver; RwLock m_WorkerLock; std::filesystem::path m_WorkerPath; std::atomic m_SandboxCounter = 0; std::filesystem::path m_SandboxPath; int32_t m_MaxRunningActions = 64; // arbitrary limit for testing // if used in conjuction with m_ResultsLock, this lock must be taken *after* // m_ResultsLock to avoid deadlocks RwLock m_RunningLock; std::unordered_map> m_RunningMap; std::atomic m_SubmittingCount = 0; DeferredDirectoryDeleter& m_DeferredDeleter; WorkerThreadPool& m_WorkerPool; std::thread m_MonitorThread; std::atomic m_MonitorThreadEnabled{true}; Event m_MonitorThreadEvent; void MonitorThreadFunction(); virtual void SweepRunningActions(); virtual void CancelRunningActions(); // Sample CPU usage for all currently running processes (throttled per-action). void SampleRunningProcessCpu(); // Override in platform runners to sample one process. Called under a shared RunningLock. virtual void SampleProcessCpu(RunningAction& /*Running*/) {} // Shared preamble for SubmitAction: capacity check, sandbox creation, // worker manifesting, action writing, input manifesting. struct PreparedAction { int32_t ActionLsn; std::filesystem::path SandboxPath; std::filesystem::path WorkerPath; CbPackage WorkerPackage; }; std::optional PrepareActionSubmission(Ref Action); // Shared post-processing for SweepRunningActions: gather outputs, // set state, clean sandbox. void ProcessCompletedActions(std::vector>& CompletedActions); std::filesystem::path CreateNewSandbox(); void ManifestWorker(const CbPackage& WorkerPackage, const std::filesystem::path& SandboxPath, std::function&& ChunkReferenceCallback); std::filesystem::path ManifestWorker(const WorkerDesc& Worker); CbPackage GatherActionOutputs(std::filesystem::path SandboxPath); void DecompressAttachmentToFile(const CbPackage& FromPackage, CbObjectView FileEntry, const std::filesystem::path& SandboxRootPath, std::function& ChunkReferenceCallback); }; } // namespace zen::compute #endif