blob: 35f4648053bee3ae3402f109e8ec840b2121ca6c (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "zencompute/functionservice.h"
#if ZEN_WITH_COMPUTE_SERVICES
# include "functionrunner.h"
# include <zencore/thread.h>
# include <zencore/zencore.h>
# include <zenstore/cidstore.h>
# include <zencore/compactbinarypackage.h>
# include <zencore/logging.h>
# include <atomic>
# include <filesystem>
# include <thread>
namespace zen {
class CbPackage;
}
namespace zen::compute {
/** Direct process spawner
This runner simply sets up a directory structure for each job and
creates a process to perform the computation in it. It is not very
efficient and is intended mostly for testing.
*/
class LocalProcessRunner : public FunctionRunner
{
LocalProcessRunner(LocalProcessRunner&&) = delete;
LocalProcessRunner& operator=(LocalProcessRunner&&) = delete;
public:
LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir);
~LocalProcessRunner();
virtual void Shutdown() override;
virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
[[nodiscard]] virtual bool IsHealthy() override { return true; }
[[nodiscard]] virtual size_t GetSubmittedActionCount() override;
[[nodiscard]] virtual size_t QueryCapacity() override;
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
protected:
LoggerRef Log() { return m_Log; }
LoggerRef m_Log;
struct RunningAction : public RefCounted
{
Ref<RunnerAction> Action;
void* ProcessHandle = nullptr;
int ExitCode = 0;
std::filesystem::path SandboxPath;
};
std::atomic_bool m_AcceptNewActions;
ChunkResolver& m_ChunkResolver;
RwLock m_WorkerLock;
std::filesystem::path m_WorkerPath;
std::atomic<int32_t> m_SandboxCounter = 0;
std::filesystem::path m_SandboxPath;
int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
// if used in conjuction with m_ResultsLock, this lock must be taken *after*
// m_ResultsLock to avoid deadlocks
RwLock m_RunningLock;
std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
std::thread m_MonitorThread;
std::atomic<bool> m_MonitorThreadEnabled{true};
Event m_MonitorThreadEvent;
void MonitorThreadFunction();
void SweepRunningActions();
void CancelRunningActions();
std::filesystem::path CreateNewSandbox();
void ManifestWorker(const CbPackage& WorkerPackage,
const std::filesystem::path& SandboxPath,
std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback);
std::filesystem::path ManifestWorker(const WorkerDesc& Worker);
CbPackage GatherActionOutputs(std::filesystem::path SandboxPath);
void DecompressAttachmentToFile(const CbPackage& FromPackage,
CbObjectView FileEntry,
const std::filesystem::path& SandboxRootPath,
std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback);
};
} // namespace zen::compute
#endif
|