aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/localrunner.h
blob: b8cff68264ffc05c9a4510b2f86881c26ed828fd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include "zencompute/computeservice.h"

#if ZEN_WITH_COMPUTE_SERVICES

#	include "functionrunner.h"

#	include <zencore/thread.h>
#	include <zencore/zencore.h>
#	include <zenstore/cidstore.h>
#	include <zencore/compactbinarypackage.h>
#	include <zencore/logging.h>

#	include "deferreddeleter.h"

#	include <zencore/workthreadpool.h>

#	include <atomic>
#	include <filesystem>
#	include <optional>
#	include <thread>

namespace zen {
class CbPackage;
}

namespace zen::compute {

/** Direct process spawner

	This runner simply sets up a directory structure for each job and
	creates a process to perform the computation in it. It is not very
	efficient and is intended mostly for testing.

 */

class LocalProcessRunner : public FunctionRunner
{
	LocalProcessRunner(LocalProcessRunner&&) = delete;
	LocalProcessRunner& operator=(LocalProcessRunner&&) = delete;

public:
	LocalProcessRunner(ChunkResolver&				Resolver,
					   const std::filesystem::path& BaseDir,
					   DeferredDirectoryDeleter&	Deleter,
					   WorkerThreadPool&			WorkerPool,
					   int32_t						MaxConcurrentActions = 0);
	~LocalProcessRunner();

	virtual void									Shutdown() override;
	[[nodiscard]] virtual bool						RegisterWorker(const CbPackage& WorkerPackage) override;
	[[nodiscard]] virtual SubmitResult				SubmitAction(Ref<RunnerAction> Action) override;
	[[nodiscard]] virtual bool						IsHealthy() override { return true; }
	[[nodiscard]] virtual size_t					GetSubmittedActionCount() override;
	[[nodiscard]] virtual size_t					QueryCapacity() override;
	[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;

protected:
	LoggerRef Log() { return m_Log; }

	LoggerRef m_Log;

	struct RunningAction : public RefCounted
	{
		Ref<RunnerAction>	  Action;
		void*				  ProcessHandle = nullptr;
		int					  ExitCode		= 0;
		std::filesystem::path SandboxPath;

		// State for periodic CPU usage sampling
		uint64_t LastCpuSampleTicks = 0;  // hifreq timer value at last sample
		uint64_t LastCpuOsTicks		= 0;  // OS CPU ticks (platform-specific units) at last sample
	};

	std::atomic_bool	  m_AcceptNewActions;
	ChunkResolver&		  m_ChunkResolver;
	RwLock				  m_WorkerLock;
	std::filesystem::path m_WorkerPath;
	std::atomic<int32_t>  m_SandboxCounter = 0;
	std::filesystem::path m_SandboxPath;
	int32_t				  m_MaxRunningActions = 64;	 // arbitrary limit for testing

	// if used in conjuction with m_ResultsLock, this lock must be taken *after*
	// m_ResultsLock to avoid deadlocks
	RwLock										m_RunningLock;
	std::unordered_map<int, Ref<RunningAction>> m_RunningMap;

	std::atomic<int32_t>	  m_SubmittingCount = 0;
	DeferredDirectoryDeleter& m_DeferredDeleter;
	WorkerThreadPool&		  m_WorkerPool;

	std::thread		  m_MonitorThread;
	std::atomic<bool> m_MonitorThreadEnabled{true};
	Event			  m_MonitorThreadEvent;
	void			  MonitorThreadFunction();
	virtual void	  SweepRunningActions();
	virtual void	  CancelRunningActions();

	// Sample CPU usage for all currently running processes (throttled per-action).
	void SampleRunningProcessCpu();

	// Override in platform runners to sample one process. Called under a shared RunningLock.
	virtual void SampleProcessCpu(RunningAction& /*Running*/) {}

	// Shared preamble for SubmitAction: capacity check, sandbox creation,
	// worker manifesting, action writing, input manifesting.
	struct PreparedAction
	{
		int32_t				  ActionLsn;
		std::filesystem::path SandboxPath;
		std::filesystem::path WorkerPath;
		CbPackage			  WorkerPackage;
	};
	std::optional<PreparedAction> PrepareActionSubmission(Ref<RunnerAction> Action);

	// Shared post-processing for SweepRunningActions: gather outputs,
	// set state, clean sandbox.
	void ProcessCompletedActions(std::vector<Ref<RunningAction>>& CompletedActions);

	std::filesystem::path CreateNewSandbox();
	void				  ManifestWorker(const CbPackage&										 WorkerPackage,
										 const std::filesystem::path&							 SandboxPath,
										 std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback);
	std::filesystem::path ManifestWorker(const WorkerDesc& Worker);
	CbPackage			  GatherActionOutputs(std::filesystem::path SandboxPath);

	void DecompressAttachmentToFile(const CbPackage&									   FromPackage,
									CbObjectView										   FileEntry,
									const std::filesystem::path&						   SandboxRootPath,
									std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback);
};

}  // namespace zen::compute

#endif