aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/macrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/runners/macrunner.cpp')
-rw-r--r--src/zencompute/runners/macrunner.cpp491
1 files changed, 491 insertions, 0 deletions
diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp
new file mode 100644
index 000000000..5cec90699
--- /dev/null
+++ b/src/zencompute/runners/macrunner.cpp
@@ -0,0 +1,491 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "macrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_MAC
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/except.h>
+# include <zencore/except_fmt.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/timer.h>
+# include <zencore/trace.h>
+
+# include <fcntl.h>
+# include <libproc.h>
+# include <sandbox.h>
+# include <signal.h>
+# include <sys/wait.h>
+# include <unistd.h>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+namespace {
+
+ // All helper functions in this namespace are async-signal-safe (safe to call
+ // between fork() and execve()). They use only raw syscalls and avoid any
+ // heap allocation, stdio, or other non-AS-safe operations.
+
+ void WriteToFd(int Fd, const char* Buf, size_t Len)
+ {
+ while (Len > 0)
+ {
+ ssize_t Written = write(Fd, Buf, Len);
+ if (Written <= 0)
+ {
+ break;
+ }
+ Buf += Written;
+ Len -= static_cast<size_t>(Written);
+ }
+ }
+
+ [[noreturn]] void WriteErrorAndExit(int ErrorPipeFd, const char* Msg, int Errno)
+ {
+ // Write the message prefix
+ size_t MsgLen = 0;
+ for (const char* P = Msg; *P; ++P)
+ {
+ ++MsgLen;
+ }
+ WriteToFd(ErrorPipeFd, Msg, MsgLen);
+
+ // Append ": " and the errno string if non-zero
+ if (Errno != 0)
+ {
+ WriteToFd(ErrorPipeFd, ": ", 2);
+ const char* ErrStr = strerror(Errno);
+ size_t ErrLen = 0;
+ for (const char* P = ErrStr; *P; ++P)
+ {
+ ++ErrLen;
+ }
+ WriteToFd(ErrorPipeFd, ErrStr, ErrLen);
+ }
+
+ _exit(127);
+ }
+
+ // Build a Seatbelt profile string that denies everything by default and
+ // allows only the minimum needed for the worker to execute: process ops,
+ // system library reads, worker directory (read-only), and sandbox directory
+ // (read-write). Network access is denied implicitly by the deny-default policy.
+ std::string BuildSandboxProfile(const std::string& SandboxPath, const std::string& WorkerPath)
+ {
+ std::string Profile;
+ Profile.reserve(1024);
+
+ Profile += "(version 1)\n";
+ Profile += "(deny default)\n";
+ Profile += "(allow process*)\n";
+ Profile += "(allow sysctl-read)\n";
+ Profile += "(allow file-read-metadata)\n";
+
+ // System library paths needed for dynamic linker and runtime
+ Profile += "(allow file-read* (subpath \"/usr\"))\n";
+ Profile += "(allow file-read* (subpath \"/System\"))\n";
+ Profile += "(allow file-read* (subpath \"/Library\"))\n";
+ Profile += "(allow file-read* (subpath \"/dev\"))\n";
+ Profile += "(allow file-read* (subpath \"/private/var/db/dyld\"))\n";
+ Profile += "(allow file-read* (subpath \"/etc\"))\n";
+
+ // Worker directory: read-only
+ Profile += "(allow file-read* (subpath \"";
+ Profile += WorkerPath;
+ Profile += "\"))\n";
+
+ // Sandbox directory: read+write
+ Profile += "(allow file-read* file-write* (subpath \"";
+ Profile += SandboxPath;
+ Profile += "\"))\n";
+
+ return Profile;
+ }
+
+} // anonymous namespace
+
+MacProcessRunner::MacProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ bool Sandboxed,
+ int32_t MaxConcurrentActions)
+: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
+, m_Sandboxed(Sandboxed)
+{
+ // Restore SIGCHLD to default behavior so waitpid() can properly collect
+ // child exit status. zenserver/main.cpp sets SIGCHLD to SIG_IGN which
+ // causes the kernel to auto-reap children, making waitpid() return
+ // -1/ECHILD instead of the exit status we need.
+ struct sigaction Action = {};
+ sigemptyset(&Action.sa_mask);
+ Action.sa_handler = SIG_DFL;
+ sigaction(SIGCHLD, &Action, nullptr);
+
+ if (m_Sandboxed)
+ {
+ ZEN_INFO("Seatbelt sandboxing enabled for child processes");
+ }
+}
+
+SubmitResult
+MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("MacProcessRunner::SubmitAction");
+ std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action);
+
+ if (!Prepared)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ // Build environment array from worker descriptor
+
+ CbObject WorkerDescription = Prepared->WorkerPackage.GetObject();
+
+ std::vector<std::string> EnvStrings;
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvStrings.emplace_back(It.AsString());
+ }
+
+ std::vector<char*> Envp;
+ Envp.reserve(EnvStrings.size() + 1);
+ for (auto& Str : EnvStrings)
+ {
+ Envp.push_back(Str.data());
+ }
+ Envp.push_back(nullptr);
+
+ // Build argv: <worker_exe_path> -Build=build.action
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath);
+ std::string ExePathStr = ExePath.string();
+ std::string BuildArg = "-Build=build.action";
+
+ std::vector<char*> ArgV;
+ ArgV.push_back(ExePathStr.data());
+ ArgV.push_back(BuildArg.data());
+ ArgV.push_back(nullptr);
+
+ ZEN_DEBUG("Executing: {} {} (sandboxed={})", ExePathStr, BuildArg, m_Sandboxed);
+
+ std::string SandboxPathStr = Prepared->SandboxPath.string();
+ std::string WorkerPathStr = Prepared->WorkerPath.string();
+
+ // Pre-fork: build sandbox profile and create error pipe
+ std::string SandboxProfile;
+ int ErrorPipe[2] = {-1, -1};
+
+ if (m_Sandboxed)
+ {
+ SandboxProfile = BuildSandboxProfile(SandboxPathStr, WorkerPathStr);
+
+ if (pipe(ErrorPipe) != 0)
+ {
+ throw zen::runtime_error("pipe() for sandbox error pipe failed: {}", strerror(errno));
+ }
+ fcntl(ErrorPipe[0], F_SETFD, FD_CLOEXEC);
+ fcntl(ErrorPipe[1], F_SETFD, FD_CLOEXEC);
+ }
+
+ pid_t ChildPid = fork();
+
+ if (ChildPid < 0)
+ {
+ int SavedErrno = errno;
+ if (m_Sandboxed)
+ {
+ close(ErrorPipe[0]);
+ close(ErrorPipe[1]);
+ }
+ throw zen::runtime_error("fork() failed: {}", strerror(SavedErrno));
+ }
+
+ if (ChildPid == 0)
+ {
+ // Child process
+
+ if (m_Sandboxed)
+ {
+ // Close read end of error pipe — child only writes
+ close(ErrorPipe[0]);
+
+ // Apply Seatbelt sandbox profile
+ char* ErrorBuf = nullptr;
+ if (sandbox_init(SandboxProfile.c_str(), 0, &ErrorBuf) != 0)
+ {
+ // sandbox_init failed — write error to pipe and exit
+ if (ErrorBuf)
+ {
+ WriteErrorAndExit(ErrorPipe[1], ErrorBuf, 0);
+ // WriteErrorAndExit does not return, but sandbox_free_error
+ // is not needed since we _exit
+ }
+ WriteErrorAndExit(ErrorPipe[1], "sandbox_init failed", errno);
+ }
+ if (ErrorBuf)
+ {
+ sandbox_free_error(ErrorBuf);
+ }
+
+ if (chdir(SandboxPathStr.c_str()) != 0)
+ {
+ WriteErrorAndExit(ErrorPipe[1], "chdir to sandbox failed", errno);
+ }
+
+ execve(ExePathStr.c_str(), ArgV.data(), Envp.data());
+
+ WriteErrorAndExit(ErrorPipe[1], "execve failed", errno);
+ }
+ else
+ {
+ if (chdir(SandboxPathStr.c_str()) != 0)
+ {
+ _exit(127);
+ }
+
+ execve(ExePathStr.c_str(), ArgV.data(), Envp.data());
+ _exit(127);
+ }
+ }
+
+ // Parent process
+
+ if (m_Sandboxed)
+ {
+ // Close write end of error pipe — parent only reads
+ close(ErrorPipe[1]);
+
+ // Read from error pipe. If execve succeeded, pipe was closed by O_CLOEXEC
+ // and read returns 0. If setup failed, child wrote an error message.
+ char ErrBuf[512];
+ ssize_t BytesRead = read(ErrorPipe[0], ErrBuf, sizeof(ErrBuf) - 1);
+ close(ErrorPipe[0]);
+
+ if (BytesRead > 0)
+ {
+ // Sandbox setup or execve failed
+ ErrBuf[BytesRead] = '\0';
+
+ // Reap the child (it called _exit(127))
+ waitpid(ChildPid, nullptr, 0);
+
+ // Clean up the sandbox in the background
+ m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
+
+ ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+
+ Action->SetActionState(RunnerAction::State::Failed);
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ // Store child pid as void* (same convention as zencore/process.cpp)
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = reinterpret_cast<void*>(static_cast<intptr_t>(ChildPid));
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RunningMap[Prepared->ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+void
+MacProcessRunner::SweepRunningActions()
+{
+ ZEN_TRACE_CPU("MacProcessRunner::SweepRunningActions");
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Running = It->second;
+
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+ int Status = 0;
+
+ pid_t Result = waitpid(Pid, &Status, WNOHANG);
+
+ if (Result == Pid)
+ {
+ if (WIFEXITED(Status))
+ {
+ Running->ExitCode = WEXITSTATUS(Status);
+ }
+ else if (WIFSIGNALED(Status))
+ {
+ Running->ExitCode = 128 + WTERMSIG(Status);
+ }
+ else
+ {
+ Running->ExitCode = 1;
+ }
+
+ Running->ProcessHandle = nullptr;
+
+ CompletedActions.push_back(std::move(Running));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+
+ ProcessCompletedActions(CompletedActions);
+}
+
+void
+MacProcessRunner::CancelRunningActions()
+{
+ ZEN_TRACE_CPU("MacProcessRunner::CancelRunningActions");
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // Send SIGTERM to all running processes first
+
+ for (const auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ if (kill(Pid, SIGTERM) != 0)
+ {
+ ZEN_WARN("kill(SIGTERM) for LSN {} (pid {}) failed: {}", Running->Action->ActionLsn, Pid, strerror(errno));
+ }
+ }
+
+ // Wait for all processes, regardless of whether SIGTERM succeeded, then clean up.
+
+ for (auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ // Poll for up to 2 seconds
+ bool Exited = false;
+ for (int i = 0; i < 20; ++i)
+ {
+ int Status = 0;
+ pid_t WaitResult = waitpid(Pid, &Status, WNOHANG);
+ if (WaitResult == Pid)
+ {
+ Exited = true;
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ break;
+ }
+ usleep(100000); // 100ms
+ }
+
+ if (!Exited)
+ {
+ ZEN_WARN("LSN {}: process did not exit after SIGTERM, sending SIGKILL", Running->Action->ActionLsn);
+ kill(Pid, SIGKILL);
+ waitpid(Pid, nullptr, 0);
+ }
+
+ m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+bool
+MacProcessRunner::CancelAction(int ActionLsn)
+{
+ ZEN_TRACE_CPU("MacProcessRunner::CancelAction");
+
+ // Hold the shared lock while sending the signal to prevent the sweep thread
+ // from reaping the PID (via waitpid) between our lookup and kill(). Without
+ // the lock held, the PID could be recycled by the kernel and we'd signal an
+ // unrelated process.
+ bool Sent = false;
+
+ m_RunningLock.WithSharedLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It == m_RunningMap.end())
+ {
+ return;
+ }
+
+ Ref<RunningAction> Target = It->second;
+ if (!Target->ProcessHandle)
+ {
+ return;
+ }
+
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Target->ProcessHandle));
+
+ if (kill(Pid, SIGTERM) != 0)
+ {
+ ZEN_WARN("CancelAction: kill(SIGTERM) for LSN {} (pid {}) failed: {}", ActionLsn, Pid, strerror(errno));
+ return;
+ }
+
+ ZEN_DEBUG("CancelAction: sent SIGTERM to LSN {} (pid {})", ActionLsn, Pid);
+ Sent = true;
+ });
+
+ // The monitor thread will pick up the process exit and mark the action as Failed.
+ return Sent;
+}
+
+void
+MacProcessRunner::SampleProcessCpu(RunningAction& Running)
+{
+ const pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running.ProcessHandle));
+
+ struct proc_taskinfo Info;
+ if (proc_pidinfo(Pid, PROC_PIDTASKINFO, 0, &Info, sizeof(Info)) <= 0)
+ {
+ return;
+ }
+
+ // pti_total_user and pti_total_system are in nanoseconds
+ const uint64_t CurrentOsTicks = Info.pti_total_user + Info.pti_total_system;
+ const uint64_t NowTicks = GetHifreqTimerValue();
+
+ // Cumulative CPU seconds (absolute, available from first sample): ns → seconds
+ Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / 1'000'000'000.0), std::memory_order_relaxed);
+
+ if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
+ {
+ const uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - Running.LastCpuSampleTicks);
+ if (ElapsedMs > 0)
+ {
+ const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
+ // ns → ms: divide by 1,000,000; then as percent of elapsed ms
+ const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) / 1'000'000.0 / ElapsedMs * 100.0);
+ Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
+ }
+ }
+
+ Running.LastCpuSampleTicks = NowTicks;
+ Running.LastCpuOsTicks = CurrentOsTicks;
+}
+
+} // namespace zen::compute
+
+#endif