aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/localrunner.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-02-18 11:28:03 +0100
committerGitHub Enterprise <[email protected]>2026-02-18 11:28:03 +0100
commit149a5c2faa8d59290b8b44717e504532e906aae2 (patch)
tree9c875f1fd89f65f939bf8f6ef67b506565be845c /src/zencompute/localrunner.cpp
parentadd selective request logging support to http.sys (#762) (diff)
downloadzen-149a5c2faa8d59290b8b44717e504532e906aae2.tar.xz
zen-149a5c2faa8d59290b8b44717e504532e906aae2.zip
structured compute basics (#714)
this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config
Diffstat (limited to 'src/zencompute/localrunner.cpp')
-rw-r--r--src/zencompute/localrunner.cpp722
1 files changed, 722 insertions, 0 deletions
diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp
new file mode 100644
index 000000000..9a27f3f3d
--- /dev/null
+++ b/src/zencompute/localrunner.cpp
@@ -0,0 +1,722 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/system.h>
+# include <zencore/scopeguard.h>
+# include <zencore/timer.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir)
+: FunctionRunner(BaseDir)
+, m_Log(logging::Get("local_exec"))
+, m_ChunkResolver(Resolver)
+, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers"))
+, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch"))
+{
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+
+ m_MaxRunningActions = Sm.LogicalProcessorCount * 2;
+
+ ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions);
+
+ bool DidCleanup = false;
+
+ if (std::filesystem::is_directory(m_ActionsPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_ActionsPath);
+
+ std::error_code Ec;
+ CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (std::filesystem::is_directory(m_SandboxPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_SandboxPath);
+ std::error_code Ec;
+ CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ // We clean out all workers on startup since we can't know they are good. They could be bad
+ // due to tampering, malware (which I also mean to include AV and antimalware software) or
+ // other processes we have no control over
+ if (std::filesystem::is_directory(m_WorkerPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_WorkerPath);
+ std::error_code Ec;
+ CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (DidCleanup)
+ {
+ ZEN_INFO("Cleanup complete");
+ }
+
+ m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
+
+# if ZEN_PLATFORM_WINDOWS
+ // Suppress any error dialogs caused by missing dependencies
+ UINT OldMode = ::SetErrorMode(0);
+ ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS);
+# endif
+
+ m_AcceptNewActions = true;
+}
+
+LocalProcessRunner::~LocalProcessRunner()
+{
+ try
+ {
+ Shutdown();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception during local process runner shutdown: {}", Ex.what());
+ }
+}
+
+void
+LocalProcessRunner::Shutdown()
+{
+ m_AcceptNewActions = false;
+
+ m_MonitorThreadEnabled = false;
+ m_MonitorThreadEvent.Set();
+ if (m_MonitorThread.joinable())
+ {
+ m_MonitorThread.join();
+ }
+
+ CancelRunningActions();
+}
+
+std::filesystem::path
+LocalProcessRunner::CreateNewSandbox()
+{
+ std::string UniqueId = std::to_string(++m_SandboxCounter);
+ std::filesystem::path Path = m_SandboxPath / UniqueId;
+ zen::CreateDirectories(Path);
+
+ return Path;
+}
+
+void
+LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ if (m_DumpActions)
+ {
+ CbObject WorkerDescriptor = WorkerPackage.GetObject();
+ const IoHash& WorkerId = WorkerPackage.GetObjectHash();
+
+ std::string UniqueId = fmt::format("worker_{}"sv, WorkerId);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer());
+
+ ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) {
+ std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString();
+ zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed());
+ });
+
+ ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path);
+ }
+}
+
+size_t
+LocalProcessRunner::QueryCapacity()
+{
+ // Estimate how much more work we're ready to accept
+
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return 0;
+ }
+
+ size_t RunningCount = m_RunningMap.size();
+
+ if (RunningCount >= size_t(m_MaxRunningActions))
+ {
+ return 0;
+ }
+
+ return m_MaxRunningActions - RunningCount;
+}
+
+std::vector<SubmitResult>
+LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+SubmitResult
+LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Verify whether we can accept more work
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ if (m_RunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ using namespace std::literals;
+
+ // Each enqueued action is assigned an integer index (logical sequence number),
+ // which we use as a key for tracking data structures and as an opaque id which
+ // may be used by clients to reference the scheduled action
+
+ const int32_t ActionLsn = Action->ActionLsn;
+ const CbObject& ActionObj = Action->ActionObj;
+ const IoHash ActionId = ActionObj.GetHash();
+
+ MaybeDumpAction(ActionLsn, ActionObj);
+
+ std::filesystem::path SandboxPath = CreateNewSandbox();
+
+ CbPackage WorkerPackage = Action->Worker.Descriptor;
+
+ std::filesystem::path WorkerPath = ManifestWorker(Action->Worker);
+
+ // Write out action
+
+ zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer());
+
+ // Manifest inputs in sandbox
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid));
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ });
+
+# if ZEN_PLATFORM_WINDOWS
+ // Set up environment variables
+
+ StringBuilder<1024> EnvironmentBlock;
+
+ CbObject WorkerDescription = WorkerPackage.GetObject();
+
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvironmentBlock.Append(It.AsString());
+ EnvironmentBlock.Append('\0');
+ }
+ EnvironmentBlock.Append('\0');
+ EnvironmentBlock.Append('\0');
+
+ // Execute process - this spawns the child process immediately without waiting
+ // for completion
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred();
+
+ ExtendableWideStringBuilder<512> CommandLine;
+ CommandLine.Append(L'"');
+ CommandLine.Append(ExePath.c_str());
+ CommandLine.Append(L'"');
+ CommandLine.Append(L" -Build=build.action");
+
+ LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
+ LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
+ BOOL bInheritHandles = FALSE;
+ DWORD dwCreationFlags = 0;
+
+ STARTUPINFO StartupInfo{};
+ StartupInfo.cb = sizeof StartupInfo;
+
+ PROCESS_INFORMATION ProcessInformation{};
+
+ ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str()));
+
+ CommandLine.EnsureNulTerminated();
+
+ BOOL Success = CreateProcessW(nullptr,
+ CommandLine.Data(),
+ lpProcessAttributes,
+ lpThreadAttributes,
+ bInheritHandles,
+ dwCreationFlags,
+ (LPVOID)EnvironmentBlock.Data(), // Environment block
+ SandboxPath.c_str(), // Current directory
+ &StartupInfo,
+ /* out */ &ProcessInformation);
+
+ if (!Success)
+ {
+ // TODO: this is probably not the best way to report failure. The return
+ // object should include a failure state and context
+
+ zen::ThrowLastError("Unable to launch process" /* TODO: Add context */);
+ }
+
+ CloseHandle(ProcessInformation.hThread);
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = ProcessInformation.hProcess;
+ NewAction->SandboxPath = std::move(SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+
+ m_RunningMap[ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+# else
+ ZEN_UNUSED(ActionId);
+
+ ZEN_NOT_IMPLEMENTED();
+
+ int ExitCode = 0;
+# endif
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+size_t
+LocalProcessRunner::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunningLock);
+ return m_RunningMap.size();
+}
+
+std::filesystem::path
+LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
+{
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ _.ReleaseNow();
+
+ RwLock::ExclusiveLockScope $(m_WorkerLock);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
+ }
+ }
+
+ return WorkerDir;
+}
+
+void
+LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback)
+{
+ std::string_view Name = FileEntry["name"sv].AsString();
+ const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+
+ CompressedBuffer Compressed;
+
+ if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash))
+ {
+ Compressed = Attachment->AsCompressedBinary();
+ }
+ else
+ {
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash));
+ }
+
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize);
+
+ if (DataRawSize != Size)
+ {
+ throw std::runtime_error(
+ fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size));
+ }
+ }
+
+ ChunkReferenceCallback(ChunkHash, Compressed);
+
+ std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()};
+
+ SharedBuffer Decompressed = Compressed.Decompress();
+ zen::WriteFile(FilePath, Decompressed.AsIoBuffer());
+}
+
+void
+LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback)
+{
+ CbObject WorkerDescription = WorkerPackage.GetObject();
+
+ // Manifest worker in Sandbox
+
+ for (auto& It : WorkerDescription["executables"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+ }
+
+ for (auto& It : WorkerDescription["dirs"sv])
+ {
+ std::string_view Name = It.AsString();
+ std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()};
+ zen::CreateDirectories(DirPath);
+ }
+
+ for (auto& It : WorkerDescription["files"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+ }
+
+ WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
+}
+
+CbPackage
+LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath)
+{
+ std::filesystem::path OutputFile = SandboxPath / "build.output";
+ FileContents OutputData = zen::ReadFile(OutputFile);
+
+ if (OutputData.ErrorCode)
+ {
+ throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile));
+ }
+
+ CbPackage OutputPackage;
+ CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten());
+
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalRawAttachmentBytes = 0;
+
+ Output.IterateAttachments([&](CbFieldView Field) {
+ IoHash Hash = Field.AsHash();
+ std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
+ FileContents ChunkData = zen::ReadFile(OutputPath);
+
+ if (ChunkData.ErrorCode)
+ {
+ throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath));
+ }
+
+ uint64_t ChunkDataRawSize = 0;
+ IoHash ChunkDataHash;
+ CompressedBuffer AttachmentBuffer =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize);
+
+ if (!AttachmentBuffer)
+ {
+ throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)");
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += ChunkDataRawSize;
+
+ CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ OutputPackage.SetObject(Output);
+
+ ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)",
+ OutputPackage.GetAttachments().size(),
+ NiceBytes(TotalAttachmentBytes),
+ NiceBytes(TotalRawAttachmentBytes));
+
+ return OutputPackage;
+}
+
+void
+LocalProcessRunner::MonitorThreadFunction()
+{
+ SetCurrentThreadName("LocalProcessRunner_Monitor");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
+
+ do
+ {
+ // On Windows it's possible to wait on process handles, so we wait for either a process to exit
+ // or for the monitor event to be signaled (which indicates we should check for cancellation
+ // or shutdown). This could be further improved by using a completion port and registering process
+ // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing
+ // with an enormous number of concurrent processes.
+ //
+ // On other platforms we just wait on the monitor event and poll for process exits at intervals.
+# if ZEN_PLATFORM_WINDOWS
+ auto WaitOnce = [&] {
+ HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS];
+
+ uint32_t NumHandles = 0;
+
+ WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle();
+
+ m_RunningLock.WithSharedLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It)
+ {
+ Ref<RunningAction> Action = It->second;
+
+ WaitHandles[NumHandles++] = Action->ProcessHandle;
+ }
+ });
+
+ DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000);
+
+ // return true if a handle was signaled
+ return (WaitResult <= NumHandles);
+ };
+# else
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); };
+# endif
+
+ while (!WaitOnce())
+ {
+ if (m_MonitorThreadEnabled == false)
+ {
+ return;
+ }
+
+ SweepRunningActions();
+ }
+
+ // Signal received
+
+ SweepRunningActions();
+ } while (m_MonitorThreadEnabled);
+}
+
+void
+LocalProcessRunner::CancelRunningActions()
+{
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // For expedience we initiate the process termination for all known
+ // processes before attempting to wait for them to exit.
+
+ std::vector<int> TerminatedLsnList;
+
+ for (const auto& Kv : RunningMap)
+ {
+ Ref<RunningAction> Action = Kv.second;
+
+ // Terminate running process
+
+# if ZEN_PLATFORM_WINDOWS
+ BOOL Success = TerminateProcess(Action->ProcessHandle, 222);
+
+ if (Success)
+ {
+ TerminatedLsnList.push_back(Kv.first);
+ }
+ else
+ {
+ DWORD LastError = GetLastError();
+
+ if (LastError != ERROR_ACCESS_DENIED)
+ {
+ ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError));
+ }
+ }
+# else
+ ZEN_NOT_IMPLEMENTED("need to implement process termination");
+# endif
+ }
+
+ // We only post results for processes we have terminated, in order
+ // to avoid multiple results getting posted for the same action
+
+ for (int Lsn : TerminatedLsnList)
+ {
+ if (auto It = RunningMap.find(Lsn); It != RunningMap.end())
+ {
+ Ref<RunningAction> Running = It->second;
+
+# if ZEN_PLATFORM_WINDOWS
+ if (Running->ProcessHandle != INVALID_HANDLE_VALUE)
+ {
+ DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000);
+
+ if (WaitResult != WAIT_OBJECT_0)
+ {
+ ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult);
+ }
+ else
+ {
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ }
+ }
+# endif
+
+ // Clean up and post error result
+
+ DeleteDirectories(Running->SandboxPath);
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+void
+LocalProcessRunner::SweepRunningActions()
+{
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ // TODO: It would be good to not hold the exclusive lock while making
+ // system calls and other expensive operations.
+
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Action = It->second;
+
+# if ZEN_PLATFORM_WINDOWS
+ DWORD ExitCode = 0;
+ BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode);
+
+ if (IsSuccess && ExitCode != STILL_ACTIVE)
+ {
+ CloseHandle(Action->ProcessHandle);
+ Action->ProcessHandle = INVALID_HANDLE_VALUE;
+
+ CompletedActions.push_back(std::move(Action));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+# else
+ // TODO: implement properly for Mac/Linux
+
+ ZEN_UNUSED(Action);
+# endif
+ }
+ });
+
+ // Notify outer. Note that this has to be done without holding any local locks
+ // otherwise we may end up with deadlocks.
+
+ for (Ref<RunningAction> Running : CompletedActions)
+ {
+ const int ActionLsn = Running->Action->ActionLsn;
+
+ if (Running->ExitCode == 0)
+ {
+ try
+ {
+ // Gather outputs
+
+ CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath);
+
+ Running->Action->SetResult(std::move(OutputPackage));
+ Running->Action->SetActionState(RunnerAction::State::Completed);
+
+ // We can delete the files at this point
+ if (!DeleteDirectories(Running->SandboxPath))
+ {
+ ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath);
+ }
+
+ // Success -- continue with next iteration of the loop
+ continue;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
+ }
+ }
+
+ // Failed - for now this is indicated with an empty package in
+ // the results map. We can clean out the sandbox directory immediately.
+
+ std::error_code Ec;
+ DeleteDirectories(Running->SandboxPath, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message());
+ }
+
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+}
+
+} // namespace zen::compute
+
+#endif