aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/localrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/localrunner.cpp')
-rw-r--r--src/zencompute/localrunner.cpp722
1 files changed, 0 insertions, 722 deletions
diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp
deleted file mode 100644
index 9a27f3f3d..000000000
--- a/src/zencompute/localrunner.cpp
+++ /dev/null
@@ -1,722 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "localrunner.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compress.h>
-# include <zencore/except.h>
-# include <zencore/filesystem.h>
-# include <zencore/fmtutils.h>
-# include <zencore/iobuffer.h>
-# include <zencore/iohash.h>
-# include <zencore/system.h>
-# include <zencore/scopeguard.h>
-# include <zencore/timer.h>
-# include <zenstore/cidstore.h>
-
-# include <span>
-
-namespace zen::compute {
-
-using namespace std::literals;
-
-LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir)
-: FunctionRunner(BaseDir)
-, m_Log(logging::Get("local_exec"))
-, m_ChunkResolver(Resolver)
-, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers"))
-, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch"))
-{
- SystemMetrics Sm = GetSystemMetricsForReporting();
-
- m_MaxRunningActions = Sm.LogicalProcessorCount * 2;
-
- ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions);
-
- bool DidCleanup = false;
-
- if (std::filesystem::is_directory(m_ActionsPath))
- {
- ZEN_INFO("Cleaning '{}'", m_ActionsPath);
-
- std::error_code Ec;
- CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
-
- if (Ec)
- {
- ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message());
- }
-
- DidCleanup = true;
- }
-
- if (std::filesystem::is_directory(m_SandboxPath))
- {
- ZEN_INFO("Cleaning '{}'", m_SandboxPath);
- std::error_code Ec;
- CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
-
- if (Ec)
- {
- ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message());
- }
-
- DidCleanup = true;
- }
-
- // We clean out all workers on startup since we can't know they are good. They could be bad
- // due to tampering, malware (which I also mean to include AV and antimalware software) or
- // other processes we have no control over
- if (std::filesystem::is_directory(m_WorkerPath))
- {
- ZEN_INFO("Cleaning '{}'", m_WorkerPath);
- std::error_code Ec;
- CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
-
- if (Ec)
- {
- ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message());
- }
-
- DidCleanup = true;
- }
-
- if (DidCleanup)
- {
- ZEN_INFO("Cleanup complete");
- }
-
- m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
-
-# if ZEN_PLATFORM_WINDOWS
- // Suppress any error dialogs caused by missing dependencies
- UINT OldMode = ::SetErrorMode(0);
- ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS);
-# endif
-
- m_AcceptNewActions = true;
-}
-
-LocalProcessRunner::~LocalProcessRunner()
-{
- try
- {
- Shutdown();
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("exception during local process runner shutdown: {}", Ex.what());
- }
-}
-
-void
-LocalProcessRunner::Shutdown()
-{
- m_AcceptNewActions = false;
-
- m_MonitorThreadEnabled = false;
- m_MonitorThreadEvent.Set();
- if (m_MonitorThread.joinable())
- {
- m_MonitorThread.join();
- }
-
- CancelRunningActions();
-}
-
-std::filesystem::path
-LocalProcessRunner::CreateNewSandbox()
-{
- std::string UniqueId = std::to_string(++m_SandboxCounter);
- std::filesystem::path Path = m_SandboxPath / UniqueId;
- zen::CreateDirectories(Path);
-
- return Path;
-}
-
-void
-LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
-{
- if (m_DumpActions)
- {
- CbObject WorkerDescriptor = WorkerPackage.GetObject();
- const IoHash& WorkerId = WorkerPackage.GetObjectHash();
-
- std::string UniqueId = fmt::format("worker_{}"sv, WorkerId);
- std::filesystem::path Path = m_ActionsPath / UniqueId;
-
- zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer());
-
- ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) {
- std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString();
- zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed());
- });
-
- ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path);
- }
-}
-
-size_t
-LocalProcessRunner::QueryCapacity()
-{
- // Estimate how much more work we're ready to accept
-
- RwLock::SharedLockScope _{m_RunningLock};
-
- if (!m_AcceptNewActions)
- {
- return 0;
- }
-
- size_t RunningCount = m_RunningMap.size();
-
- if (RunningCount >= size_t(m_MaxRunningActions))
- {
- return 0;
- }
-
- return m_MaxRunningActions - RunningCount;
-}
-
-std::vector<SubmitResult>
-LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
-{
- std::vector<SubmitResult> Results;
-
- for (const Ref<RunnerAction>& Action : Actions)
- {
- Results.push_back(SubmitAction(Action));
- }
-
- return Results;
-}
-
-SubmitResult
-LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action)
-{
- // Verify whether we can accept more work
-
- {
- RwLock::SharedLockScope _{m_RunningLock};
-
- if (!m_AcceptNewActions)
- {
- return SubmitResult{.IsAccepted = false};
- }
-
- if (m_RunningMap.size() >= size_t(m_MaxRunningActions))
- {
- return SubmitResult{.IsAccepted = false};
- }
- }
-
- using namespace std::literals;
-
- // Each enqueued action is assigned an integer index (logical sequence number),
- // which we use as a key for tracking data structures and as an opaque id which
- // may be used by clients to reference the scheduled action
-
- const int32_t ActionLsn = Action->ActionLsn;
- const CbObject& ActionObj = Action->ActionObj;
- const IoHash ActionId = ActionObj.GetHash();
-
- MaybeDumpAction(ActionLsn, ActionObj);
-
- std::filesystem::path SandboxPath = CreateNewSandbox();
-
- CbPackage WorkerPackage = Action->Worker.Descriptor;
-
- std::filesystem::path WorkerPath = ManifestWorker(Action->Worker);
-
- // Write out action
-
- zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer());
-
- // Manifest inputs in sandbox
-
- ActionObj.IterateAttachments([&](CbFieldView Field) {
- const IoHash Cid = Field.AsHash();
- std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()};
- IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid);
-
- if (!DataBuffer)
- {
- throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid));
- }
-
- zen::WriteFile(FilePath, DataBuffer);
- });
-
-# if ZEN_PLATFORM_WINDOWS
- // Set up environment variables
-
- StringBuilder<1024> EnvironmentBlock;
-
- CbObject WorkerDescription = WorkerPackage.GetObject();
-
- for (auto& It : WorkerDescription["environment"sv])
- {
- EnvironmentBlock.Append(It.AsString());
- EnvironmentBlock.Append('\0');
- }
- EnvironmentBlock.Append('\0');
- EnvironmentBlock.Append('\0');
-
- // Execute process - this spawns the child process immediately without waiting
- // for completion
-
- std::string_view ExecPath = WorkerDescription["path"sv].AsString();
- std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred();
-
- ExtendableWideStringBuilder<512> CommandLine;
- CommandLine.Append(L'"');
- CommandLine.Append(ExePath.c_str());
- CommandLine.Append(L'"');
- CommandLine.Append(L" -Build=build.action");
-
- LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
- LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
- BOOL bInheritHandles = FALSE;
- DWORD dwCreationFlags = 0;
-
- STARTUPINFO StartupInfo{};
- StartupInfo.cb = sizeof StartupInfo;
-
- PROCESS_INFORMATION ProcessInformation{};
-
- ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str()));
-
- CommandLine.EnsureNulTerminated();
-
- BOOL Success = CreateProcessW(nullptr,
- CommandLine.Data(),
- lpProcessAttributes,
- lpThreadAttributes,
- bInheritHandles,
- dwCreationFlags,
- (LPVOID)EnvironmentBlock.Data(), // Environment block
- SandboxPath.c_str(), // Current directory
- &StartupInfo,
- /* out */ &ProcessInformation);
-
- if (!Success)
- {
- // TODO: this is probably not the best way to report failure. The return
- // object should include a failure state and context
-
- zen::ThrowLastError("Unable to launch process" /* TODO: Add context */);
- }
-
- CloseHandle(ProcessInformation.hThread);
-
- Ref<RunningAction> NewAction{new RunningAction()};
- NewAction->Action = Action;
- NewAction->ProcessHandle = ProcessInformation.hProcess;
- NewAction->SandboxPath = std::move(SandboxPath);
-
- {
- RwLock::ExclusiveLockScope _(m_RunningLock);
-
- m_RunningMap[ActionLsn] = std::move(NewAction);
- }
-
- Action->SetActionState(RunnerAction::State::Running);
-# else
- ZEN_UNUSED(ActionId);
-
- ZEN_NOT_IMPLEMENTED();
-
- int ExitCode = 0;
-# endif
-
- return SubmitResult{.IsAccepted = true};
-}
-
-size_t
-LocalProcessRunner::GetSubmittedActionCount()
-{
- RwLock::SharedLockScope _(m_RunningLock);
- return m_RunningMap.size();
-}
-
-std::filesystem::path
-LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
-{
- RwLock::SharedLockScope _(m_WorkerLock);
-
- std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
-
- if (!std::filesystem::exists(WorkerDir))
- {
- _.ReleaseNow();
-
- RwLock::ExclusiveLockScope $(m_WorkerLock);
-
- if (!std::filesystem::exists(WorkerDir))
- {
- ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
- }
- }
-
- return WorkerDir;
-}
-
-void
-LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage,
- CbObjectView FileEntry,
- const std::filesystem::path& SandboxRootPath,
- std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback)
-{
- std::string_view Name = FileEntry["name"sv].AsString();
- const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
- const uint64_t Size = FileEntry["size"sv].AsUInt64();
-
- CompressedBuffer Compressed;
-
- if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash))
- {
- Compressed = Attachment->AsCompressedBinary();
- }
- else
- {
- IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash);
-
- if (!DataBuffer)
- {
- throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash));
- }
-
- uint64_t DataRawSize = 0;
- IoHash DataRawHash;
- Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize);
-
- if (DataRawSize != Size)
- {
- throw std::runtime_error(
- fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size));
- }
- }
-
- ChunkReferenceCallback(ChunkHash, Compressed);
-
- std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()};
-
- SharedBuffer Decompressed = Compressed.Decompress();
- zen::WriteFile(FilePath, Decompressed.AsIoBuffer());
-}
-
-void
-LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
- const std::filesystem::path& SandboxPath,
- std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback)
-{
- CbObject WorkerDescription = WorkerPackage.GetObject();
-
- // Manifest worker in Sandbox
-
- for (auto& It : WorkerDescription["executables"sv])
- {
- DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
- }
-
- for (auto& It : WorkerDescription["dirs"sv])
- {
- std::string_view Name = It.AsString();
- std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()};
- zen::CreateDirectories(DirPath);
- }
-
- for (auto& It : WorkerDescription["files"sv])
- {
- DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
- }
-
- WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
-}
-
-CbPackage
-LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath)
-{
- std::filesystem::path OutputFile = SandboxPath / "build.output";
- FileContents OutputData = zen::ReadFile(OutputFile);
-
- if (OutputData.ErrorCode)
- {
- throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile));
- }
-
- CbPackage OutputPackage;
- CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten());
-
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalRawAttachmentBytes = 0;
-
- Output.IterateAttachments([&](CbFieldView Field) {
- IoHash Hash = Field.AsHash();
- std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
- FileContents ChunkData = zen::ReadFile(OutputPath);
-
- if (ChunkData.ErrorCode)
- {
- throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath));
- }
-
- uint64_t ChunkDataRawSize = 0;
- IoHash ChunkDataHash;
- CompressedBuffer AttachmentBuffer =
- CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize);
-
- if (!AttachmentBuffer)
- {
- throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)");
- }
-
- TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
- TotalRawAttachmentBytes += ChunkDataRawSize;
-
- CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash);
- OutputPackage.AddAttachment(Attachment);
- });
-
- OutputPackage.SetObject(Output);
-
- ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)",
- OutputPackage.GetAttachments().size(),
- NiceBytes(TotalAttachmentBytes),
- NiceBytes(TotalRawAttachmentBytes));
-
- return OutputPackage;
-}
-
-void
-LocalProcessRunner::MonitorThreadFunction()
-{
- SetCurrentThreadName("LocalProcessRunner_Monitor");
-
- auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
-
- do
- {
- // On Windows it's possible to wait on process handles, so we wait for either a process to exit
- // or for the monitor event to be signaled (which indicates we should check for cancellation
- // or shutdown). This could be further improved by using a completion port and registering process
- // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing
- // with an enormous number of concurrent processes.
- //
- // On other platforms we just wait on the monitor event and poll for process exits at intervals.
-# if ZEN_PLATFORM_WINDOWS
- auto WaitOnce = [&] {
- HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS];
-
- uint32_t NumHandles = 0;
-
- WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle();
-
- m_RunningLock.WithSharedLock([&] {
- for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It)
- {
- Ref<RunningAction> Action = It->second;
-
- WaitHandles[NumHandles++] = Action->ProcessHandle;
- }
- });
-
- DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000);
-
- // return true if a handle was signaled
- return (WaitResult <= NumHandles);
- };
-# else
- auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); };
-# endif
-
- while (!WaitOnce())
- {
- if (m_MonitorThreadEnabled == false)
- {
- return;
- }
-
- SweepRunningActions();
- }
-
- // Signal received
-
- SweepRunningActions();
- } while (m_MonitorThreadEnabled);
-}
-
-void
-LocalProcessRunner::CancelRunningActions()
-{
- Stopwatch Timer;
- std::unordered_map<int, Ref<RunningAction>> RunningMap;
-
- m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
-
- if (RunningMap.empty())
- {
- return;
- }
-
- ZEN_INFO("cancelling all running actions");
-
- // For expedience we initiate the process termination for all known
- // processes before attempting to wait for them to exit.
-
- std::vector<int> TerminatedLsnList;
-
- for (const auto& Kv : RunningMap)
- {
- Ref<RunningAction> Action = Kv.second;
-
- // Terminate running process
-
-# if ZEN_PLATFORM_WINDOWS
- BOOL Success = TerminateProcess(Action->ProcessHandle, 222);
-
- if (Success)
- {
- TerminatedLsnList.push_back(Kv.first);
- }
- else
- {
- DWORD LastError = GetLastError();
-
- if (LastError != ERROR_ACCESS_DENIED)
- {
- ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError));
- }
- }
-# else
- ZEN_NOT_IMPLEMENTED("need to implement process termination");
-# endif
- }
-
- // We only post results for processes we have terminated, in order
- // to avoid multiple results getting posted for the same action
-
- for (int Lsn : TerminatedLsnList)
- {
- if (auto It = RunningMap.find(Lsn); It != RunningMap.end())
- {
- Ref<RunningAction> Running = It->second;
-
-# if ZEN_PLATFORM_WINDOWS
- if (Running->ProcessHandle != INVALID_HANDLE_VALUE)
- {
- DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000);
-
- if (WaitResult != WAIT_OBJECT_0)
- {
- ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult);
- }
- else
- {
- ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
- }
- }
-# endif
-
- // Clean up and post error result
-
- DeleteDirectories(Running->SandboxPath);
- Running->Action->SetActionState(RunnerAction::State::Failed);
- }
- }
-
- ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
-}
-
-void
-LocalProcessRunner::SweepRunningActions()
-{
- std::vector<Ref<RunningAction>> CompletedActions;
-
- m_RunningLock.WithExclusiveLock([&] {
- // TODO: It would be good to not hold the exclusive lock while making
- // system calls and other expensive operations.
-
- for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
- {
- Ref<RunningAction> Action = It->second;
-
-# if ZEN_PLATFORM_WINDOWS
- DWORD ExitCode = 0;
- BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode);
-
- if (IsSuccess && ExitCode != STILL_ACTIVE)
- {
- CloseHandle(Action->ProcessHandle);
- Action->ProcessHandle = INVALID_HANDLE_VALUE;
-
- CompletedActions.push_back(std::move(Action));
- It = m_RunningMap.erase(It);
- }
- else
- {
- ++It;
- }
-# else
- // TODO: implement properly for Mac/Linux
-
- ZEN_UNUSED(Action);
-# endif
- }
- });
-
- // Notify outer. Note that this has to be done without holding any local locks
- // otherwise we may end up with deadlocks.
-
- for (Ref<RunningAction> Running : CompletedActions)
- {
- const int ActionLsn = Running->Action->ActionLsn;
-
- if (Running->ExitCode == 0)
- {
- try
- {
- // Gather outputs
-
- CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath);
-
- Running->Action->SetResult(std::move(OutputPackage));
- Running->Action->SetActionState(RunnerAction::State::Completed);
-
- // We can delete the files at this point
- if (!DeleteDirectories(Running->SandboxPath))
- {
- ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath);
- }
-
- // Success -- continue with next iteration of the loop
- continue;
- }
- catch (std::exception& Ex)
- {
- ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
- }
- }
-
- // Failed - for now this is indicated with an empty package in
- // the results map. We can clean out the sandbox directory immediately.
-
- std::error_code Ec;
- DeleteDirectories(Running->SandboxPath, Ec);
-
- if (Ec)
- {
- ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message());
- }
-
- Running->Action->SetActionState(RunnerAction::State::Failed);
- }
-}
-
-} // namespace zen::compute
-
-#endif