// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "zencompute/functionservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "functionrunner.h" # include # include # include # include # include # include # include # include namespace zen { class CbPackage; } namespace zen::compute { /** Direct process spawner This runner simply sets up a directory structure for each job and creates a process to perform the computation in it. It is not very efficient and is intended mostly for testing. */ class LocalProcessRunner : public FunctionRunner { LocalProcessRunner(LocalProcessRunner&&) = delete; LocalProcessRunner& operator=(LocalProcessRunner&&) = delete; public: LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir); ~LocalProcessRunner(); virtual void Shutdown() override; virtual void RegisterWorker(const CbPackage& WorkerPackage) override; [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) override; [[nodiscard]] virtual bool IsHealthy() override { return true; } [[nodiscard]] virtual size_t GetSubmittedActionCount() override; [[nodiscard]] virtual size_t QueryCapacity() override; [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions) override; protected: LoggerRef Log() { return m_Log; } LoggerRef m_Log; struct RunningAction : public RefCounted { Ref Action; void* ProcessHandle = nullptr; int ExitCode = 0; std::filesystem::path SandboxPath; }; std::atomic_bool m_AcceptNewActions; ChunkResolver& m_ChunkResolver; RwLock m_WorkerLock; std::filesystem::path m_WorkerPath; std::atomic m_SandboxCounter = 0; std::filesystem::path m_SandboxPath; int32_t m_MaxRunningActions = 64; // arbitrary limit for testing // if used in conjuction with m_ResultsLock, this lock must be taken *after* // m_ResultsLock to avoid deadlocks RwLock m_RunningLock; std::unordered_map> m_RunningMap; std::thread m_MonitorThread; std::atomic m_MonitorThreadEnabled{true}; Event m_MonitorThreadEvent; void MonitorThreadFunction(); void SweepRunningActions(); void CancelRunningActions(); std::filesystem::path CreateNewSandbox(); void ManifestWorker(const CbPackage& WorkerPackage, const std::filesystem::path& SandboxPath, std::function&& ChunkReferenceCallback); std::filesystem::path ManifestWorker(const WorkerDesc& Worker); CbPackage GatherActionOutputs(std::filesystem::path SandboxPath); void DecompressAttachmentToFile(const CbPackage& FromPackage, CbObjectView FileEntry, const std::filesystem::path& SandboxRootPath, std::function& ChunkReferenceCallback); }; } // namespace zen::compute #endif