aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/localrunner.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/localrunner.h')
-rw-r--r--src/zencompute/localrunner.h100
1 files changed, 100 insertions, 0 deletions
diff --git a/src/zencompute/localrunner.h b/src/zencompute/localrunner.h
new file mode 100644
index 000000000..35f464805
--- /dev/null
+++ b/src/zencompute/localrunner.h
@@ -0,0 +1,100 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/thread.h>
+# include <zencore/zencore.h>
+# include <zenstore/cidstore.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/logging.h>
+
+# include <atomic>
+# include <filesystem>
+# include <thread>
+
+namespace zen {
+class CbPackage;
+}
+
+namespace zen::compute {
+
+/** Direct process spawner
+
+ This runner simply sets up a directory structure for each job and
+ creates a process to perform the computation in it. It is not very
+ efficient and is intended mostly for testing.
+
+ */
+
+class LocalProcessRunner : public FunctionRunner
+{
+ LocalProcessRunner(LocalProcessRunner&&) = delete;
+ LocalProcessRunner& operator=(LocalProcessRunner&&) = delete;
+
+public:
+ LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir);
+ ~LocalProcessRunner();
+
+ virtual void Shutdown() override;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ [[nodiscard]] virtual bool IsHealthy() override { return true; }
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() override;
+ [[nodiscard]] virtual size_t QueryCapacity() override;
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
+
+protected:
+ LoggerRef Log() { return m_Log; }
+
+ LoggerRef m_Log;
+
+ struct RunningAction : public RefCounted
+ {
+ Ref<RunnerAction> Action;
+ void* ProcessHandle = nullptr;
+ int ExitCode = 0;
+ std::filesystem::path SandboxPath;
+ };
+
+ std::atomic_bool m_AcceptNewActions;
+ ChunkResolver& m_ChunkResolver;
+ RwLock m_WorkerLock;
+ std::filesystem::path m_WorkerPath;
+ std::atomic<int32_t> m_SandboxCounter = 0;
+ std::filesystem::path m_SandboxPath;
+ int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
+
+ // if used in conjuction with m_ResultsLock, this lock must be taken *after*
+ // m_ResultsLock to avoid deadlocks
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
+
+ std::thread m_MonitorThread;
+ std::atomic<bool> m_MonitorThreadEnabled{true};
+ Event m_MonitorThreadEvent;
+ void MonitorThreadFunction();
+ void SweepRunningActions();
+ void CancelRunningActions();
+
+ std::filesystem::path CreateNewSandbox();
+ void ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback);
+ std::filesystem::path ManifestWorker(const WorkerDesc& Worker);
+ CbPackage GatherActionOutputs(std::filesystem::path SandboxPath);
+
+ void DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback);
+};
+
+} // namespace zen::compute
+
+#endif