diff options
Diffstat (limited to 'src/zencompute/localrunner.h')
| -rw-r--r-- | src/zencompute/localrunner.h | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/src/zencompute/localrunner.h b/src/zencompute/localrunner.h new file mode 100644 index 000000000..35f464805 --- /dev/null +++ b/src/zencompute/localrunner.h @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/thread.h> +# include <zencore/zencore.h> +# include <zenstore/cidstore.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/logging.h> + +# include <atomic> +# include <filesystem> +# include <thread> + +namespace zen { +class CbPackage; +} + +namespace zen::compute { + +/** Direct process spawner + + This runner simply sets up a directory structure for each job and + creates a process to perform the computation in it. It is not very + efficient and is intended mostly for testing. + + */ + +class LocalProcessRunner : public FunctionRunner +{ + LocalProcessRunner(LocalProcessRunner&&) = delete; + LocalProcessRunner& operator=(LocalProcessRunner&&) = delete; + +public: + LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir); + ~LocalProcessRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; + [[nodiscard]] virtual bool IsHealthy() override { return true; } + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + struct RunningAction : public RefCounted + { + Ref<RunnerAction> Action; + void* ProcessHandle = nullptr; + int ExitCode = 0; + std::filesystem::path SandboxPath; + }; + + std::atomic_bool m_AcceptNewActions; + ChunkResolver& m_ChunkResolver; + RwLock m_WorkerLock; + std::filesystem::path m_WorkerPath; + std::atomic<int32_t> m_SandboxCounter = 0; + std::filesystem::path m_SandboxPath; + int32_t m_MaxRunningActions = 64; // arbitrary limit for testing + + // if used in conjuction with m_ResultsLock, this lock must be taken *after* + // m_ResultsLock to avoid deadlocks + RwLock m_RunningLock; + std::unordered_map<int, Ref<RunningAction>> m_RunningMap; + + std::thread m_MonitorThread; + std::atomic<bool> m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + void SweepRunningActions(); + void CancelRunningActions(); + + std::filesystem::path CreateNewSandbox(); + void ManifestWorker(const CbPackage& WorkerPackage, + const std::filesystem::path& SandboxPath, + std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback); + std::filesystem::path ManifestWorker(const WorkerDesc& Worker); + CbPackage GatherActionOutputs(std::filesystem::path SandboxPath); + + void DecompressAttachmentToFile(const CbPackage& FromPackage, + CbObjectView FileEntry, + const std::filesystem::path& SandboxRootPath, + std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback); +}; + +} // namespace zen::compute + +#endif |