// Copyright Epic Games, Inc. All Rights Reserved. #include "localrunner.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # include namespace zen::compute { using namespace std::literals; LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir) : FunctionRunner(BaseDir) , m_Log(logging::Get("local_exec")) , m_ChunkResolver(Resolver) , m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers")) , m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch")) { SystemMetrics Sm = GetSystemMetricsForReporting(); m_MaxRunningActions = Sm.LogicalProcessorCount * 2; ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions); bool DidCleanup = false; if (std::filesystem::is_directory(m_ActionsPath)) { ZEN_INFO("Cleaning '{}'", m_ActionsPath); std::error_code Ec; CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec); if (Ec) { ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message()); } DidCleanup = true; } if (std::filesystem::is_directory(m_SandboxPath)) { ZEN_INFO("Cleaning '{}'", m_SandboxPath); std::error_code Ec; CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec); if (Ec) { ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message()); } DidCleanup = true; } // We clean out all workers on startup since we can't know they are good. They could be bad // due to tampering, malware (which I also mean to include AV and antimalware software) or // other processes we have no control over if (std::filesystem::is_directory(m_WorkerPath)) { ZEN_INFO("Cleaning '{}'", m_WorkerPath); std::error_code Ec; CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec); if (Ec) { ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message()); } DidCleanup = true; } if (DidCleanup) { ZEN_INFO("Cleanup complete"); } m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this}; # if ZEN_PLATFORM_WINDOWS // Suppress any error dialogs caused by missing dependencies UINT OldMode = ::SetErrorMode(0); ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS); # endif m_AcceptNewActions = true; } LocalProcessRunner::~LocalProcessRunner() { try { Shutdown(); } catch (std::exception& Ex) { ZEN_WARN("exception during local process runner shutdown: {}", Ex.what()); } } void LocalProcessRunner::Shutdown() { m_AcceptNewActions = false; m_MonitorThreadEnabled = false; m_MonitorThreadEvent.Set(); if (m_MonitorThread.joinable()) { m_MonitorThread.join(); } CancelRunningActions(); } std::filesystem::path LocalProcessRunner::CreateNewSandbox() { std::string UniqueId = std::to_string(++m_SandboxCounter); std::filesystem::path Path = m_SandboxPath / UniqueId; zen::CreateDirectories(Path); return Path; } void LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage) { if (m_DumpActions) { CbObject WorkerDescriptor = WorkerPackage.GetObject(); const IoHash& WorkerId = WorkerPackage.GetObjectHash(); std::string UniqueId = fmt::format("worker_{}"sv, WorkerId); std::filesystem::path Path = m_ActionsPath / UniqueId; zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer()); ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) { std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString(); zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed()); }); ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path); } } size_t LocalProcessRunner::QueryCapacity() { // Estimate how much more work we're ready to accept RwLock::SharedLockScope _{m_RunningLock}; if (!m_AcceptNewActions) { return 0; } size_t RunningCount = m_RunningMap.size(); if (RunningCount >= size_t(m_MaxRunningActions)) { return 0; } return m_MaxRunningActions - RunningCount; } std::vector LocalProcessRunner::SubmitActions(const std::vector>& Actions) { std::vector Results; for (const Ref& Action : Actions) { Results.push_back(SubmitAction(Action)); } return Results; } SubmitResult LocalProcessRunner::SubmitAction(Ref Action) { // Verify whether we can accept more work { RwLock::SharedLockScope _{m_RunningLock}; if (!m_AcceptNewActions) { return SubmitResult{.IsAccepted = false}; } if (m_RunningMap.size() >= size_t(m_MaxRunningActions)) { return SubmitResult{.IsAccepted = false}; } } using namespace std::literals; // Each enqueued action is assigned an integer index (logical sequence number), // which we use as a key for tracking data structures and as an opaque id which // may be used by clients to reference the scheduled action const int32_t ActionLsn = Action->ActionLsn; const CbObject& ActionObj = Action->ActionObj; const IoHash ActionId = ActionObj.GetHash(); MaybeDumpAction(ActionLsn, ActionObj); std::filesystem::path SandboxPath = CreateNewSandbox(); CbPackage WorkerPackage = Action->Worker.Descriptor; std::filesystem::path WorkerPath = ManifestWorker(Action->Worker); // Write out action zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer()); // Manifest inputs in sandbox ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash Cid = Field.AsHash(); std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()}; IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid); if (!DataBuffer) { throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); } zen::WriteFile(FilePath, DataBuffer); }); # if ZEN_PLATFORM_WINDOWS // Set up environment variables StringBuilder<1024> EnvironmentBlock; CbObject WorkerDescription = WorkerPackage.GetObject(); for (auto& It : WorkerDescription["environment"sv]) { EnvironmentBlock.Append(It.AsString()); EnvironmentBlock.Append('\0'); } EnvironmentBlock.Append('\0'); EnvironmentBlock.Append('\0'); // Execute process - this spawns the child process immediately without waiting // for completion std::string_view ExecPath = WorkerDescription["path"sv].AsString(); std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred(); ExtendableWideStringBuilder<512> CommandLine; CommandLine.Append(L'"'); CommandLine.Append(ExePath.c_str()); CommandLine.Append(L'"'); CommandLine.Append(L" -Build=build.action"); LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; BOOL bInheritHandles = FALSE; DWORD dwCreationFlags = 0; STARTUPINFO StartupInfo{}; StartupInfo.cb = sizeof StartupInfo; PROCESS_INFORMATION ProcessInformation{}; ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str())); CommandLine.EnsureNulTerminated(); BOOL Success = CreateProcessW(nullptr, CommandLine.Data(), lpProcessAttributes, lpThreadAttributes, bInheritHandles, dwCreationFlags, (LPVOID)EnvironmentBlock.Data(), // Environment block SandboxPath.c_str(), // Current directory &StartupInfo, /* out */ &ProcessInformation); if (!Success) { // TODO: this is probably not the best way to report failure. The return // object should include a failure state and context zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); } CloseHandle(ProcessInformation.hThread); Ref NewAction{new RunningAction()}; NewAction->Action = Action; NewAction->ProcessHandle = ProcessInformation.hProcess; NewAction->SandboxPath = std::move(SandboxPath); { RwLock::ExclusiveLockScope _(m_RunningLock); m_RunningMap[ActionLsn] = std::move(NewAction); } Action->SetActionState(RunnerAction::State::Running); # else ZEN_UNUSED(ActionId); ZEN_NOT_IMPLEMENTED(); int ExitCode = 0; # endif return SubmitResult{.IsAccepted = true}; } size_t LocalProcessRunner::GetSubmittedActionCount() { RwLock::SharedLockScope _(m_RunningLock); return m_RunningMap.size(); } std::filesystem::path LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker) { RwLock::SharedLockScope _(m_WorkerLock); std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId); if (!std::filesystem::exists(WorkerDir)) { _.ReleaseNow(); RwLock::ExclusiveLockScope $(m_WorkerLock); if (!std::filesystem::exists(WorkerDir)) { ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {}); } } return WorkerDir; } void LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage, CbObjectView FileEntry, const std::filesystem::path& SandboxRootPath, std::function& ChunkReferenceCallback) { std::string_view Name = FileEntry["name"sv].AsString(); const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); const uint64_t Size = FileEntry["size"sv].AsUInt64(); CompressedBuffer Compressed; if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash)) { Compressed = Attachment->AsCompressedBinary(); } else { IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash); if (!DataBuffer) { throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash)); } uint64_t DataRawSize = 0; IoHash DataRawHash; Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize); if (DataRawSize != Size) { throw std::runtime_error( fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); } } ChunkReferenceCallback(ChunkHash, Compressed); std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()}; SharedBuffer Decompressed = Compressed.Decompress(); zen::WriteFile(FilePath, Decompressed.AsIoBuffer()); } void LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage, const std::filesystem::path& SandboxPath, std::function&& ChunkReferenceCallback) { CbObject WorkerDescription = WorkerPackage.GetObject(); // Manifest worker in Sandbox for (auto& It : WorkerDescription["executables"sv]) { DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); } for (auto& It : WorkerDescription["dirs"sv]) { std::string_view Name = It.AsString(); std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()}; zen::CreateDirectories(DirPath); } for (auto& It : WorkerDescription["files"sv]) { DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); } WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer()); } CbPackage LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath) { std::filesystem::path OutputFile = SandboxPath / "build.output"; FileContents OutputData = zen::ReadFile(OutputFile); if (OutputData.ErrorCode) { throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile)); } CbPackage OutputPackage; CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten()); uint64_t TotalAttachmentBytes = 0; uint64_t TotalRawAttachmentBytes = 0; Output.IterateAttachments([&](CbFieldView Field) { IoHash Hash = Field.AsHash(); std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; FileContents ChunkData = zen::ReadFile(OutputPath); if (ChunkData.ErrorCode) { throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath)); } uint64_t ChunkDataRawSize = 0; IoHash ChunkDataHash; CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize); if (!AttachmentBuffer) { throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); } TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); TotalRawAttachmentBytes += ChunkDataRawSize; CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash); OutputPackage.AddAttachment(Attachment); }); OutputPackage.SetObject(Output); ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", OutputPackage.GetAttachments().size(), NiceBytes(TotalAttachmentBytes), NiceBytes(TotalRawAttachmentBytes)); return OutputPackage; } void LocalProcessRunner::MonitorThreadFunction() { SetCurrentThreadName("LocalProcessRunner_Monitor"); auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); do { // On Windows it's possible to wait on process handles, so we wait for either a process to exit // or for the monitor event to be signaled (which indicates we should check for cancellation // or shutdown). This could be further improved by using a completion port and registering process // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing // with an enormous number of concurrent processes. // // On other platforms we just wait on the monitor event and poll for process exits at intervals. # if ZEN_PLATFORM_WINDOWS auto WaitOnce = [&] { HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS]; uint32_t NumHandles = 0; WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle(); m_RunningLock.WithSharedLock([&] { for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It) { Ref Action = It->second; WaitHandles[NumHandles++] = Action->ProcessHandle; } }); DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000); // return true if a handle was signaled return (WaitResult <= NumHandles); }; # else auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); }; # endif while (!WaitOnce()) { if (m_MonitorThreadEnabled == false) { return; } SweepRunningActions(); } // Signal received SweepRunningActions(); } while (m_MonitorThreadEnabled); } void LocalProcessRunner::CancelRunningActions() { Stopwatch Timer; std::unordered_map> RunningMap; m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); if (RunningMap.empty()) { return; } ZEN_INFO("cancelling all running actions"); // For expedience we initiate the process termination for all known // processes before attempting to wait for them to exit. std::vector TerminatedLsnList; for (const auto& Kv : RunningMap) { Ref Action = Kv.second; // Terminate running process # if ZEN_PLATFORM_WINDOWS BOOL Success = TerminateProcess(Action->ProcessHandle, 222); if (Success) { TerminatedLsnList.push_back(Kv.first); } else { DWORD LastError = GetLastError(); if (LastError != ERROR_ACCESS_DENIED) { ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError)); } } # else ZEN_NOT_IMPLEMENTED("need to implement process termination"); # endif } // We only post results for processes we have terminated, in order // to avoid multiple results getting posted for the same action for (int Lsn : TerminatedLsnList) { if (auto It = RunningMap.find(Lsn); It != RunningMap.end()) { Ref Running = It->second; # if ZEN_PLATFORM_WINDOWS if (Running->ProcessHandle != INVALID_HANDLE_VALUE) { DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000); if (WaitResult != WAIT_OBJECT_0) { ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult); } else { ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn); } } # endif // Clean up and post error result DeleteDirectories(Running->SandboxPath); Running->Action->SetActionState(RunnerAction::State::Failed); } } ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } void LocalProcessRunner::SweepRunningActions() { std::vector> CompletedActions; m_RunningLock.WithExclusiveLock([&] { // TODO: It would be good to not hold the exclusive lock while making // system calls and other expensive operations. for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;) { Ref Action = It->second; # if ZEN_PLATFORM_WINDOWS DWORD ExitCode = 0; BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode); if (IsSuccess && ExitCode != STILL_ACTIVE) { CloseHandle(Action->ProcessHandle); Action->ProcessHandle = INVALID_HANDLE_VALUE; CompletedActions.push_back(std::move(Action)); It = m_RunningMap.erase(It); } else { ++It; } # else // TODO: implement properly for Mac/Linux ZEN_UNUSED(Action); # endif } }); // Notify outer. Note that this has to be done without holding any local locks // otherwise we may end up with deadlocks. for (Ref Running : CompletedActions) { const int ActionLsn = Running->Action->ActionLsn; if (Running->ExitCode == 0) { try { // Gather outputs CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath); Running->Action->SetResult(std::move(OutputPackage)); Running->Action->SetActionState(RunnerAction::State::Completed); // We can delete the files at this point if (!DeleteDirectories(Running->SandboxPath)) { ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath); } // Success -- continue with next iteration of the loop continue; } catch (std::exception& Ex) { ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what()); } } // Failed - for now this is indicated with an empty package in // the results map. We can clean out the sandbox directory immediately. std::error_code Ec; DeleteDirectories(Running->SandboxPath, Ec); if (Ec) { ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message()); } Running->Action->SetActionState(RunnerAction::State::Failed); } } } // namespace zen::compute #endif