1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "zencompute/computeservice.h"
#if ZEN_WITH_COMPUTE_SERVICES
# include "functionrunner.h"
# include <zencore/thread.h>
# include <zencore/zencore.h>
# include <zenstore/cidstore.h>
# include <zencore/compactbinarypackage.h>
# include <zencore/logging.h>
# include "deferreddeleter.h"
# include <zencore/workthreadpool.h>
# include <atomic>
# include <filesystem>
# include <optional>
# include <thread>
namespace zen {
class CbPackage;
}
namespace zen::compute {
/** Direct process spawner
This runner simply sets up a directory structure for each job and
creates a process to perform the computation in it. It is not very
efficient and is intended mostly for testing.
*/
class LocalProcessRunner : public FunctionRunner
{
LocalProcessRunner(LocalProcessRunner&&) = delete;
LocalProcessRunner& operator=(LocalProcessRunner&&) = delete;
public:
LocalProcessRunner(ChunkResolver& Resolver,
const std::filesystem::path& BaseDir,
DeferredDirectoryDeleter& Deleter,
WorkerThreadPool& WorkerPool,
int32_t MaxConcurrentActions = 0);
~LocalProcessRunner();
virtual void Shutdown() override;
[[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
[[nodiscard]] virtual bool IsHealthy() override { return true; }
[[nodiscard]] virtual size_t GetSubmittedActionCount() override;
[[nodiscard]] virtual size_t QueryCapacity() override;
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
protected:
LoggerRef Log() { return m_Log; }
LoggerRef m_Log;
struct RunningAction : public RefCounted
{
Ref<RunnerAction> Action;
void* ProcessHandle = nullptr;
int ExitCode = 0;
std::filesystem::path SandboxPath;
// State for periodic CPU usage sampling
uint64_t LastCpuSampleTicks = 0; // hifreq timer value at last sample
uint64_t LastCpuOsTicks = 0; // OS CPU ticks (platform-specific units) at last sample
};
std::atomic_bool m_AcceptNewActions;
ChunkResolver& m_ChunkResolver;
RwLock m_WorkerLock;
std::filesystem::path m_WorkerPath;
std::atomic<int32_t> m_SandboxCounter = 0;
std::filesystem::path m_SandboxPath;
int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
// if used in conjuction with m_ResultsLock, this lock must be taken *after*
// m_ResultsLock to avoid deadlocks
RwLock m_RunningLock;
std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
std::atomic<int32_t> m_SubmittingCount = 0;
DeferredDirectoryDeleter& m_DeferredDeleter;
WorkerThreadPool& m_WorkerPool;
std::thread m_MonitorThread;
std::atomic<bool> m_MonitorThreadEnabled{true};
Event m_MonitorThreadEvent;
void MonitorThreadFunction();
virtual void SweepRunningActions();
virtual void CancelRunningActions();
// Sample CPU usage for all currently running processes (throttled per-action).
void SampleRunningProcessCpu();
// Override in platform runners to sample one process. Called under a shared RunningLock.
virtual void SampleProcessCpu(RunningAction& /*Running*/) {}
// Shared preamble for SubmitAction: capacity check, sandbox creation,
// worker manifesting, action writing, input manifesting.
struct PreparedAction
{
int32_t ActionLsn;
std::filesystem::path SandboxPath;
std::filesystem::path WorkerPath;
CbPackage WorkerPackage;
};
std::optional<PreparedAction> PrepareActionSubmission(Ref<RunnerAction> Action);
// Shared post-processing for SweepRunningActions: gather outputs,
// set state, clean sandbox.
void ProcessCompletedActions(std::vector<Ref<RunningAction>>& CompletedActions);
std::filesystem::path CreateNewSandbox();
void ManifestWorker(const CbPackage& WorkerPackage,
const std::filesystem::path& SandboxPath,
std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback);
std::filesystem::path ManifestWorker(const WorkerDesc& Worker);
CbPackage GatherActionOutputs(std::filesystem::path SandboxPath);
void DecompressAttachmentToFile(const CbPackage& FromPackage,
CbObjectView FileEntry,
const std::filesystem::path& SandboxRootPath,
std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback);
};
} // namespace zen::compute
#endif
|