diff options
Diffstat (limited to 'src/zencompute/localrunner.cpp')
| -rw-r--r-- | src/zencompute/localrunner.cpp | 722 |
1 files changed, 0 insertions, 722 deletions
diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp deleted file mode 100644 index 9a27f3f3d..000000000 --- a/src/zencompute/localrunner.cpp +++ /dev/null @@ -1,722 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "localrunner.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compress.h> -# include <zencore/except.h> -# include <zencore/filesystem.h> -# include <zencore/fmtutils.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/system.h> -# include <zencore/scopeguard.h> -# include <zencore/timer.h> -# include <zenstore/cidstore.h> - -# include <span> - -namespace zen::compute { - -using namespace std::literals; - -LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir) -: FunctionRunner(BaseDir) -, m_Log(logging::Get("local_exec")) -, m_ChunkResolver(Resolver) -, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers")) -, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch")) -{ - SystemMetrics Sm = GetSystemMetricsForReporting(); - - m_MaxRunningActions = Sm.LogicalProcessorCount * 2; - - ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions); - - bool DidCleanup = false; - - if (std::filesystem::is_directory(m_ActionsPath)) - { - ZEN_INFO("Cleaning '{}'", m_ActionsPath); - - std::error_code Ec; - CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec); - - if (Ec) - { - ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message()); - } - - DidCleanup = true; - } - - if (std::filesystem::is_directory(m_SandboxPath)) - { - ZEN_INFO("Cleaning '{}'", m_SandboxPath); - std::error_code Ec; - CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec); - - if (Ec) - { - ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message()); - } - - DidCleanup = true; - } - - // We clean out all workers on startup since we can't know they are good. They could be bad - // due to tampering, malware (which I also mean to include AV and antimalware software) or - // other processes we have no control over - if (std::filesystem::is_directory(m_WorkerPath)) - { - ZEN_INFO("Cleaning '{}'", m_WorkerPath); - std::error_code Ec; - CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec); - - if (Ec) - { - ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message()); - } - - DidCleanup = true; - } - - if (DidCleanup) - { - ZEN_INFO("Cleanup complete"); - } - - m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this}; - -# if ZEN_PLATFORM_WINDOWS - // Suppress any error dialogs caused by missing dependencies - UINT OldMode = ::SetErrorMode(0); - ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS); -# endif - - m_AcceptNewActions = true; -} - -LocalProcessRunner::~LocalProcessRunner() -{ - try - { - Shutdown(); - } - catch (std::exception& Ex) - { - ZEN_WARN("exception during local process runner shutdown: {}", Ex.what()); - } -} - -void -LocalProcessRunner::Shutdown() -{ - m_AcceptNewActions = false; - - m_MonitorThreadEnabled = false; - m_MonitorThreadEvent.Set(); - if (m_MonitorThread.joinable()) - { - m_MonitorThread.join(); - } - - CancelRunningActions(); -} - -std::filesystem::path -LocalProcessRunner::CreateNewSandbox() -{ - std::string UniqueId = std::to_string(++m_SandboxCounter); - std::filesystem::path Path = m_SandboxPath / UniqueId; - zen::CreateDirectories(Path); - - return Path; -} - -void -LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage) -{ - if (m_DumpActions) - { - CbObject WorkerDescriptor = WorkerPackage.GetObject(); - const IoHash& WorkerId = WorkerPackage.GetObjectHash(); - - std::string UniqueId = fmt::format("worker_{}"sv, WorkerId); - std::filesystem::path Path = m_ActionsPath / UniqueId; - - zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer()); - - ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) { - std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString(); - zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed()); - }); - - ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path); - } -} - -size_t -LocalProcessRunner::QueryCapacity() -{ - // Estimate how much more work we're ready to accept - - RwLock::SharedLockScope _{m_RunningLock}; - - if (!m_AcceptNewActions) - { - return 0; - } - - size_t RunningCount = m_RunningMap.size(); - - if (RunningCount >= size_t(m_MaxRunningActions)) - { - return 0; - } - - return m_MaxRunningActions - RunningCount; -} - -std::vector<SubmitResult> -LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) -{ - std::vector<SubmitResult> Results; - - for (const Ref<RunnerAction>& Action : Actions) - { - Results.push_back(SubmitAction(Action)); - } - - return Results; -} - -SubmitResult -LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action) -{ - // Verify whether we can accept more work - - { - RwLock::SharedLockScope _{m_RunningLock}; - - if (!m_AcceptNewActions) - { - return SubmitResult{.IsAccepted = false}; - } - - if (m_RunningMap.size() >= size_t(m_MaxRunningActions)) - { - return SubmitResult{.IsAccepted = false}; - } - } - - using namespace std::literals; - - // Each enqueued action is assigned an integer index (logical sequence number), - // which we use as a key for tracking data structures and as an opaque id which - // may be used by clients to reference the scheduled action - - const int32_t ActionLsn = Action->ActionLsn; - const CbObject& ActionObj = Action->ActionObj; - const IoHash ActionId = ActionObj.GetHash(); - - MaybeDumpAction(ActionLsn, ActionObj); - - std::filesystem::path SandboxPath = CreateNewSandbox(); - - CbPackage WorkerPackage = Action->Worker.Descriptor; - - std::filesystem::path WorkerPath = ManifestWorker(Action->Worker); - - // Write out action - - zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer()); - - // Manifest inputs in sandbox - - ActionObj.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()}; - IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); - } - - zen::WriteFile(FilePath, DataBuffer); - }); - -# if ZEN_PLATFORM_WINDOWS - // Set up environment variables - - StringBuilder<1024> EnvironmentBlock; - - CbObject WorkerDescription = WorkerPackage.GetObject(); - - for (auto& It : WorkerDescription["environment"sv]) - { - EnvironmentBlock.Append(It.AsString()); - EnvironmentBlock.Append('\0'); - } - EnvironmentBlock.Append('\0'); - EnvironmentBlock.Append('\0'); - - // Execute process - this spawns the child process immediately without waiting - // for completion - - std::string_view ExecPath = WorkerDescription["path"sv].AsString(); - std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred(); - - ExtendableWideStringBuilder<512> CommandLine; - CommandLine.Append(L'"'); - CommandLine.Append(ExePath.c_str()); - CommandLine.Append(L'"'); - CommandLine.Append(L" -Build=build.action"); - - LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; - LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; - BOOL bInheritHandles = FALSE; - DWORD dwCreationFlags = 0; - - STARTUPINFO StartupInfo{}; - StartupInfo.cb = sizeof StartupInfo; - - PROCESS_INFORMATION ProcessInformation{}; - - ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str())); - - CommandLine.EnsureNulTerminated(); - - BOOL Success = CreateProcessW(nullptr, - CommandLine.Data(), - lpProcessAttributes, - lpThreadAttributes, - bInheritHandles, - dwCreationFlags, - (LPVOID)EnvironmentBlock.Data(), // Environment block - SandboxPath.c_str(), // Current directory - &StartupInfo, - /* out */ &ProcessInformation); - - if (!Success) - { - // TODO: this is probably not the best way to report failure. The return - // object should include a failure state and context - - zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); - } - - CloseHandle(ProcessInformation.hThread); - - Ref<RunningAction> NewAction{new RunningAction()}; - NewAction->Action = Action; - NewAction->ProcessHandle = ProcessInformation.hProcess; - NewAction->SandboxPath = std::move(SandboxPath); - - { - RwLock::ExclusiveLockScope _(m_RunningLock); - - m_RunningMap[ActionLsn] = std::move(NewAction); - } - - Action->SetActionState(RunnerAction::State::Running); -# else - ZEN_UNUSED(ActionId); - - ZEN_NOT_IMPLEMENTED(); - - int ExitCode = 0; -# endif - - return SubmitResult{.IsAccepted = true}; -} - -size_t -LocalProcessRunner::GetSubmittedActionCount() -{ - RwLock::SharedLockScope _(m_RunningLock); - return m_RunningMap.size(); -} - -std::filesystem::path -LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker) -{ - RwLock::SharedLockScope _(m_WorkerLock); - - std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId); - - if (!std::filesystem::exists(WorkerDir)) - { - _.ReleaseNow(); - - RwLock::ExclusiveLockScope $(m_WorkerLock); - - if (!std::filesystem::exists(WorkerDir)) - { - ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {}); - } - } - - return WorkerDir; -} - -void -LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage, - CbObjectView FileEntry, - const std::filesystem::path& SandboxRootPath, - std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback) -{ - std::string_view Name = FileEntry["name"sv].AsString(); - const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); - - CompressedBuffer Compressed; - - if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash)) - { - Compressed = Attachment->AsCompressedBinary(); - } - else - { - IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash)); - } - - uint64_t DataRawSize = 0; - IoHash DataRawHash; - Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize); - - if (DataRawSize != Size) - { - throw std::runtime_error( - fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); - } - } - - ChunkReferenceCallback(ChunkHash, Compressed); - - std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()}; - - SharedBuffer Decompressed = Compressed.Decompress(); - zen::WriteFile(FilePath, Decompressed.AsIoBuffer()); -} - -void -LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage, - const std::filesystem::path& SandboxPath, - std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback) -{ - CbObject WorkerDescription = WorkerPackage.GetObject(); - - // Manifest worker in Sandbox - - for (auto& It : WorkerDescription["executables"sv]) - { - DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); - } - - for (auto& It : WorkerDescription["dirs"sv]) - { - std::string_view Name = It.AsString(); - std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()}; - zen::CreateDirectories(DirPath); - } - - for (auto& It : WorkerDescription["files"sv]) - { - DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); - } - - WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer()); -} - -CbPackage -LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath) -{ - std::filesystem::path OutputFile = SandboxPath / "build.output"; - FileContents OutputData = zen::ReadFile(OutputFile); - - if (OutputData.ErrorCode) - { - throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile)); - } - - CbPackage OutputPackage; - CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten()); - - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalRawAttachmentBytes = 0; - - Output.IterateAttachments([&](CbFieldView Field) { - IoHash Hash = Field.AsHash(); - std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; - FileContents ChunkData = zen::ReadFile(OutputPath); - - if (ChunkData.ErrorCode) - { - throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath)); - } - - uint64_t ChunkDataRawSize = 0; - IoHash ChunkDataHash; - CompressedBuffer AttachmentBuffer = - CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize); - - if (!AttachmentBuffer) - { - throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); - } - - TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - TotalRawAttachmentBytes += ChunkDataRawSize; - - CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash); - OutputPackage.AddAttachment(Attachment); - }); - - OutputPackage.SetObject(Output); - - ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", - OutputPackage.GetAttachments().size(), - NiceBytes(TotalAttachmentBytes), - NiceBytes(TotalRawAttachmentBytes)); - - return OutputPackage; -} - -void -LocalProcessRunner::MonitorThreadFunction() -{ - SetCurrentThreadName("LocalProcessRunner_Monitor"); - - auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); - - do - { - // On Windows it's possible to wait on process handles, so we wait for either a process to exit - // or for the monitor event to be signaled (which indicates we should check for cancellation - // or shutdown). This could be further improved by using a completion port and registering process - // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing - // with an enormous number of concurrent processes. - // - // On other platforms we just wait on the monitor event and poll for process exits at intervals. -# if ZEN_PLATFORM_WINDOWS - auto WaitOnce = [&] { - HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS]; - - uint32_t NumHandles = 0; - - WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle(); - - m_RunningLock.WithSharedLock([&] { - for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It) - { - Ref<RunningAction> Action = It->second; - - WaitHandles[NumHandles++] = Action->ProcessHandle; - } - }); - - DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000); - - // return true if a handle was signaled - return (WaitResult <= NumHandles); - }; -# else - auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); }; -# endif - - while (!WaitOnce()) - { - if (m_MonitorThreadEnabled == false) - { - return; - } - - SweepRunningActions(); - } - - // Signal received - - SweepRunningActions(); - } while (m_MonitorThreadEnabled); -} - -void -LocalProcessRunner::CancelRunningActions() -{ - Stopwatch Timer; - std::unordered_map<int, Ref<RunningAction>> RunningMap; - - m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); - - if (RunningMap.empty()) - { - return; - } - - ZEN_INFO("cancelling all running actions"); - - // For expedience we initiate the process termination for all known - // processes before attempting to wait for them to exit. - - std::vector<int> TerminatedLsnList; - - for (const auto& Kv : RunningMap) - { - Ref<RunningAction> Action = Kv.second; - - // Terminate running process - -# if ZEN_PLATFORM_WINDOWS - BOOL Success = TerminateProcess(Action->ProcessHandle, 222); - - if (Success) - { - TerminatedLsnList.push_back(Kv.first); - } - else - { - DWORD LastError = GetLastError(); - - if (LastError != ERROR_ACCESS_DENIED) - { - ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError)); - } - } -# else - ZEN_NOT_IMPLEMENTED("need to implement process termination"); -# endif - } - - // We only post results for processes we have terminated, in order - // to avoid multiple results getting posted for the same action - - for (int Lsn : TerminatedLsnList) - { - if (auto It = RunningMap.find(Lsn); It != RunningMap.end()) - { - Ref<RunningAction> Running = It->second; - -# if ZEN_PLATFORM_WINDOWS - if (Running->ProcessHandle != INVALID_HANDLE_VALUE) - { - DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000); - - if (WaitResult != WAIT_OBJECT_0) - { - ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult); - } - else - { - ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn); - } - } -# endif - - // Clean up and post error result - - DeleteDirectories(Running->SandboxPath); - Running->Action->SetActionState(RunnerAction::State::Failed); - } - } - - ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); -} - -void -LocalProcessRunner::SweepRunningActions() -{ - std::vector<Ref<RunningAction>> CompletedActions; - - m_RunningLock.WithExclusiveLock([&] { - // TODO: It would be good to not hold the exclusive lock while making - // system calls and other expensive operations. - - for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;) - { - Ref<RunningAction> Action = It->second; - -# if ZEN_PLATFORM_WINDOWS - DWORD ExitCode = 0; - BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode); - - if (IsSuccess && ExitCode != STILL_ACTIVE) - { - CloseHandle(Action->ProcessHandle); - Action->ProcessHandle = INVALID_HANDLE_VALUE; - - CompletedActions.push_back(std::move(Action)); - It = m_RunningMap.erase(It); - } - else - { - ++It; - } -# else - // TODO: implement properly for Mac/Linux - - ZEN_UNUSED(Action); -# endif - } - }); - - // Notify outer. Note that this has to be done without holding any local locks - // otherwise we may end up with deadlocks. - - for (Ref<RunningAction> Running : CompletedActions) - { - const int ActionLsn = Running->Action->ActionLsn; - - if (Running->ExitCode == 0) - { - try - { - // Gather outputs - - CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath); - - Running->Action->SetResult(std::move(OutputPackage)); - Running->Action->SetActionState(RunnerAction::State::Completed); - - // We can delete the files at this point - if (!DeleteDirectories(Running->SandboxPath)) - { - ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath); - } - - // Success -- continue with next iteration of the loop - continue; - } - catch (std::exception& Ex) - { - ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what()); - } - } - - // Failed - for now this is indicated with an empty package in - // the results map. We can clean out the sandbox directory immediately. - - std::error_code Ec; - DeleteDirectories(Running->SandboxPath, Ec); - - if (Ec) - { - ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message()); - } - - Running->Action->SetActionState(RunnerAction::State::Failed); - } -} - -} // namespace zen::compute - -#endif |