diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-22 11:47:38 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-22 11:47:38 -0700 |
| commit | cc5adf4cb79c92993fabfe09e75dfadb7d4c9665 (patch) | |
| tree | 4ba0a18f68e39685fa784d872bbb4bb9ba2b6fd7 /zenserver/compute/apply.cpp | |
| parent | move workthreadpool to zencore (#63) (diff) | |
| download | zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.tar.xz zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.zip | |
Enable Horde compute code on Linux & Mac (#61)
Diffstat (limited to 'zenserver/compute/apply.cpp')
| -rw-r--r-- | zenserver/compute/apply.cpp | 992 |
1 files changed, 0 insertions, 992 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp deleted file mode 100644 index 694e9f662..000000000 --- a/zenserver/compute/apply.cpp +++ /dev/null @@ -1,992 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "apply.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <upstream/jupiter.h> -# include <upstream/upstreamapply.h> -# include <upstream/upstreamcache.h> -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compress.h> -# include <zencore/except.h> -# include <zencore/filesystem.h> -# include <zencore/fmtutils.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/scopeguard.h> -# include <zenstore/cas.h> -# include <zenstore/cidstore.h> - -# include <zencore/windows.h> -ZEN_THIRD_PARTY_INCLUDES_START -# include <AccCtrl.h> -# include <AclAPI.h> -# include <UserEnv.h> -# include <sddl.h> -# pragma comment(lib, "UserEnv.lib") -# include <atlbase.h> -ZEN_THIRD_PARTY_INCLUDES_END - -# include <filesystem> -# include <span> - -using namespace std::literals; - -namespace zen { - -struct BasicFunctionJob -{ -public: - BasicFunctionJob() = default; - ~BasicFunctionJob(); - - void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; } - bool SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine); - bool Wait(uint32_t TimeoutMs = ~0); - int ExitCode(); - -private: - std::filesystem::path m_WorkingDirectory; - int m_ProcessId = 0; - CHandle m_ProcessHandle; -}; - -BasicFunctionJob::~BasicFunctionJob() -{ - Wait(); -} - -bool -BasicFunctionJob::SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine) -{ - STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)}; - PROCESS_INFORMATION ProcessInfo{}; - - std::wstring ExePathNative = ExePath.native(); - std::wstring WorkingDirNative = m_WorkingDirectory.native(); - - BOOL Created = ::CreateProcess(ExePathNative.data() /* ApplicationName */, - CommandLine.data() /* Command Line */, - nullptr /* Process Attributes */, - nullptr /* Security Attributes */, - FALSE /* InheritHandles */, - 0 /* Flags */, - nullptr /* Environment */, - WorkingDirNative.data() /* Current Directory */, - (LPSTARTUPINFO)&StartupInfo, - &ProcessInfo); - - if (!Created) - { - throw std::system_error(::GetLastError(), std::system_category(), fmt::format("Failed to create process '{}'", ExePath).c_str()); - } - - m_ProcessId = ProcessInfo.dwProcessId; - m_ProcessHandle.Attach(ProcessInfo.hProcess); - ::CloseHandle(ProcessInfo.hThread); - - ZEN_INFO("Created process {}", m_ProcessId); - - return true; -} - -bool -BasicFunctionJob::Wait(uint32_t TimeoutMs) -{ - if (!m_ProcessHandle) - { - return true; - } - - DWORD WaitResult = WaitForSingleObject(m_ProcessHandle, TimeoutMs); - - if (WaitResult == WAIT_TIMEOUT) - { - return false; - } - - if (WaitResult == WAIT_OBJECT_0) - { - return true; - } - - throw std::runtime_error("Failed wait on process handle"); -} - -int -BasicFunctionJob::ExitCode() -{ - DWORD Ec = 0; - BOOL Success = GetExitCodeProcess(m_ProcessHandle, &Ec); - - if (!Success) - { - ZEN_WARN("failed getting exit code"); - } - - if (Ec == STILL_ACTIVE) - { - ZEN_WARN("getting exit code but process is STILL_ACTIVE"); - } - - return gsl::narrow_cast<int>(Ec); -} - -//////////////////////////////////////////////////////////////////////////////// - -struct SandboxedFunctionJob -{ - SandboxedFunctionJob() = default; - ~SandboxedFunctionJob() = default; - - void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; } - void Initialize(std::string_view AppContainerId); - bool SpawnJob(std::filesystem::path ExePath); - void AddWhitelistFile(const std::filesystem::path& FilePath) { m_WhitelistFiles.push_back(FilePath); } - -private: - bool GrantNamedObjectAccess(PWSTR Name, SE_OBJECT_TYPE Type, ACCESS_MASK AccessMask, bool Recursive); - - std::filesystem::path m_WorkingDirectory; - std::vector<std::filesystem::path> m_WhitelistFiles; - std::vector<std::wstring> m_WhitelistRegistryKeys; - PSID m_AppContainerSid = nullptr; - bool m_IsInitialized = false; -}; - -bool -SandboxedFunctionJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE ObjectType, ACCESS_MASK AccessMask, bool Recursive) -{ - DWORD Status; - PACL NewAcl = nullptr; - - DWORD grfInhericance = 0; - - if (Recursive) - { - grfInhericance = OBJECT_INHERIT_ACE | CONTAINER_INHERIT_ACE; - } - - EXPLICIT_ACCESS Access{.grfAccessPermissions = AccessMask, - .grfAccessMode = GRANT_ACCESS, - .grfInheritance = grfInhericance, - .Trustee = {.pMultipleTrustee = nullptr, - .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE, - .TrusteeForm = TRUSTEE_IS_SID, - .TrusteeType = TRUSTEE_IS_GROUP, - .ptstrName = (PWSTR)m_AppContainerSid}}; - - PACL OldAcl = nullptr; - - Status = GetNamedSecurityInfo(ObjectName /* ObjectName */, - ObjectType /* ObjectType */, - DACL_SECURITY_INFORMATION /* SecurityInfo */, - nullptr /* ppsidOwner */, - nullptr /* ppsidGroup */, - &OldAcl /* ppDacl */, - nullptr /* ppSacl */, - nullptr /* ppSecurityDescriptor */); - if (Status != ERROR_SUCCESS) - return false; - - Status = SetEntriesInAcl(1 /* CountOfExplicitEntries */, &Access /* pListOfExplicitEntries */, OldAcl, &NewAcl); - if (Status != ERROR_SUCCESS) - return false; - - Status = SetNamedSecurityInfo(ObjectName /* ObjectName */, - ObjectType /* ObjectType */, - DACL_SECURITY_INFORMATION /*SecurityInfo */, - nullptr /* psidOwner */, - nullptr /* psidGroup */, - NewAcl /* pDacl */, - nullptr /* pSacl */); - if (NewAcl) - ::LocalFree(NewAcl); - - return Status == ERROR_SUCCESS; -} - -void -SandboxedFunctionJob::Initialize(std::string_view AppContainerId) -{ - if (m_IsInitialized) - { - return; - } - - std::wstring ContainerName = zen::Utf8ToWide(AppContainerId); - - HRESULT hRes = ::CreateAppContainerProfile(ContainerName.c_str(), - ContainerName.c_str() /* Display Name */, - ContainerName.c_str() /* Description */, - nullptr /* Capabilities */, - 0 /* Capability Count */, - &m_AppContainerSid); - - if (FAILED(hRes)) - { - hRes = ::DeriveAppContainerSidFromAppContainerName(ContainerName.c_str(), &m_AppContainerSid); - - if (FAILED(hRes)) - { - ZEN_ERROR("Failed creating app container SID"); - } - } - - // Debugging context - - PWSTR Str = nullptr; - ::ConvertSidToStringSid(m_AppContainerSid, &Str); - - ZEN_INFO("AppContainer SID : '{}'", WideToUtf8(Str)); - - PWSTR Path = nullptr; - if (SUCCEEDED(::GetAppContainerFolderPath(Str, &Path))) - { - ZEN_INFO("AppContainer folder: '{}'", WideToUtf8(Path)); - - ::CoTaskMemFree(Path); - } - ::LocalFree(Str); - - m_IsInitialized = true; -} - -bool -SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath) -{ - // Build process attributes - - SECURITY_CAPABILITIES Sc = {0}; - Sc.AppContainerSid = m_AppContainerSid; - - STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)}; - PROCESS_INFORMATION ProcessInfo{}; - SIZE_T Size = 0; - - ::InitializeProcThreadAttributeList(nullptr, 1, 0, &Size); - - auto AttrBuffer = std::make_unique<uint8_t[]>(Size); - StartupInfo.lpAttributeList = reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(AttrBuffer.get()); - - if (!::InitializeProcThreadAttributeList(StartupInfo.lpAttributeList, 1, 0, &Size)) - { - return false; - } - - if (!::UpdateProcThreadAttribute(StartupInfo.lpAttributeList, - 0, - PROC_THREAD_ATTRIBUTE_SECURITY_CAPABILITIES, - &Sc, - sizeof Sc, - nullptr, - nullptr)) - { - return false; - } - - // Set up security for files/folders/registry - - for (const std::filesystem::path& File : m_WhitelistFiles) - { - std::wstring NativeFileName = File.native(); - GrantNamedObjectAccess(NativeFileName.data(), SE_FILE_OBJECT, FILE_ALL_ACCESS, true); - } - - for (std::wstring& RegKey : m_WhitelistRegistryKeys) - { - GrantNamedObjectAccess(RegKey.data(), SE_REGISTRY_WOW64_32KEY, KEY_ALL_ACCESS, true); - } - - std::wstring ExePathNative = ExePath.native(); - std::wstring WorkingDirNative = m_WorkingDirectory.native(); - - BOOL Created = ::CreateProcess(nullptr /* ApplicationName */, - ExePathNative.data() /* Command line */, - nullptr /* Process Attributes */, - nullptr /* Security Attributes */, - FALSE /* InheritHandles */, - EXTENDED_STARTUPINFO_PRESENT | CREATE_NEW_CONSOLE /* Flags */, - nullptr /* Environment */, - WorkingDirNative.data() /* Current Directory */, - (LPSTARTUPINFO)&StartupInfo, - &ProcessInfo); - - DeleteProcThreadAttributeList(StartupInfo.lpAttributeList); - - if (!Created) - { - return false; - } - - ZEN_INFO("Created process {}", ProcessInfo.dwProcessId); - - return true; -} - -//////////////////////////////////////////////////////////////////////////////// - -HttpFunctionService::HttpFunctionService(CasStore& Store, - CidStore& InCidStore, - const std::filesystem::path& BaseDir, - const CloudCacheClientOptions& ComputeOptions, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const UpstreamAuthConfig& StorageAuthConfig, - AuthMgr& Mgr) -: m_Log(logging::Get("apply")) -, m_CasStore(Store) -, m_CidStore(InCidStore) -, m_SandboxPath(BaseDir / "scratch") -, m_FunctionPath(BaseDir / "func") -{ - m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); - - auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, - ComputeAuthConfig, - StorageOptions, - StorageAuthConfig, - m_CasStore, - m_CidStore, - Mgr); - m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); - m_UpstreamApply->Initialize(); - - m_Router.AddPattern("job", "([[:digit:]]+)"); - m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); - m_Router.AddPattern("action", "([[:xdigit:]]{40})"); - - m_Router.RegisterRoute( - "workers/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - const WorkerDesc& Desc = It->second; - return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); - } - } - break; - - case HttpVerb::kPost: - { - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - CbObject FunctionSpec = HttpReq.ReadPayloadObject(); - - // Determine which pieces are missing and need to be transmitted to populate CAS - - CasChunkSet ChunkSet; - - FunctionSpec.IterateAttachments([&](CbFieldView Field) { - const IoHash Hash = Field.AsHash(); - ChunkSet.AddChunkToSet(Hash); - }); - - // Note that we store executables uncompressed to make it - // more straightforward and efficient to materialize them, hence - // the CAS lookup here instead of CID for the input payloads - - m_CasStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); - - ZEN_DEBUG("worker {}: all attachments already available", WorkerId); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - else - { - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("need"); - - ChunkSet.IterateChunks([&](const IoHash& Hash) { - ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); - - ResponseWriter.AddHash(Hash); - }); - - ResponseWriter.EndArray(); - - ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); - } - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); - - CbObject Obj = FunctionSpec.GetObject(); - - std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - SharedBuffer Decompressed = DataView.Decompress(); - const uint64_t DecompressedSize = DataView.GetRawSize(); - - ZEN_UNUSED(DataHash); - - TotalAttachmentBytes += DecompressedSize; - ++AttachmentCount; - - // Note that we store executables uncompressed to make it - // more straightforward and efficient to materialize them - - const CasStore::InsertResult InsertResult = - m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); - - if (InsertResult.New) - { - TotalNewBytes += DecompressedSize; - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", - WorkerId, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - break; - - default: - break; - } - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{job}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - break; - - case HttpVerb::kPost: - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{worker}/{action}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - CbPackage Output; - HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - } - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "jobs/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - WorkerDesc Worker; - - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - Worker = It->second; - } - } - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - // TODO: return status of all pending or executing jobs - break; - - case HttpVerb::kPost: - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - RequestObject.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - default: - break; - } - break; - - default: - break; - } - }, - HttpVerb::kPost); -} - -HttpFunctionService::~HttpFunctionService() -{ -} - -const char* -HttpFunctionService::BaseUri() const -{ - return "/apply/"; -} - -void -HttpFunctionService::HandleRequest(HttpServerRequest& Request) -{ - if (m_Router.HandleRequest(Request) == false) - { - ZEN_WARN("No route found for {0}", Request.RelativeUri()); - } -} - -std::filesystem::path -HttpFunctionService::CreateNewSandbox() -{ - std::string UniqueId = std::to_string(++m_SandboxCount); - std::filesystem::path Path = m_SandboxPath / UniqueId; - zen::CreateDirectories(Path); - return Path; -} - -CbPackage -HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) -{ - using namespace std::literals; - - std::filesystem::path SandboxPath = CreateNewSandbox(); - - CbObject Desc = Worker.Descriptor; - - // Manifest worker in Sandbox - - for (auto& It : Desc["executables"]) - { - CbObjectView ExecEntry = It.AsObjectView(); - - std::string_view Name = ExecEntry["name"sv].AsString(); - const IoHash ChunkHash = ExecEntry["hash"sv].AsHash(); - const uint64_t Size = ExecEntry["size"sv].AsUInt64(); - - std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash)); - } - - if (DataBuffer.Size() != Size) - { - throw std::runtime_error( - fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); - } - - zen::WriteFile(FilePath, DataBuffer); - } - - for (auto& It : Desc["dirs"]) - { - std::string_view Name = It.AsString(); - std::filesystem::path DirPath{SandboxPath / Name}; - zen::CreateDirectories(DirPath); - } - - for (auto& It : Desc["files"]) - { - CbObjectView FileEntry = It.AsObjectView(); - - std::string_view Name = FileEntry["name"sv].AsString(); - const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); - - std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash)); - } - - if (DataBuffer.Size() != Size) - { - throw std::runtime_error( - fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); - } - - zen::WriteFile(FilePath, DataBuffer); - } - - // Write out action - - zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer()); - - // Manifest inputs in sandbox - - Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()}; - IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); - } - - zen::WriteFile(FilePath, DataBuffer); - }); - - // Set up environment variables - - StringBuilder<1024> EnvironmentBlock; - - for (auto& It : Desc["environment"]) - { - EnvironmentBlock.Append(It.AsString()); - EnvironmentBlock.Append('\0'); - } - EnvironmentBlock.Append('\0'); - EnvironmentBlock.Append('\0'); - - // Execute process - - std::string_view ExecPath = Desc["path"].AsString(); - std::filesystem::path ExePath = SandboxPath / ExecPath; - - WideStringBuilder<512> CommandLine; - CommandLine.Append(L'"'); - CommandLine.Append(ExePath.c_str()); - CommandLine.Append(L'"'); - CommandLine.Append(L" -Build=build.action"); - - LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; - LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; - BOOL bInheritHandles = FALSE; - DWORD dwCreationFlags = 0; - - STARTUPINFO StartupInfo{}; - StartupInfo.cb = sizeof StartupInfo; - - PROCESS_INFORMATION ProcessInformation{}; - - BOOL Success = CreateProcessW(nullptr, - CommandLine.Data(), - lpProcessAttributes, - lpThreadAttributes, - bInheritHandles, - dwCreationFlags, - (LPVOID)EnvironmentBlock.Data(), // Environment block - SandboxPath.c_str(), // Current directory - &StartupInfo, - /* out */ &ProcessInformation); - - if (!Success) - { - zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); - } - - CloseHandle(ProcessInformation.hThread); - auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); }); - - DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE); - - if (Result != WAIT_OBJECT_0) - { - zen::ThrowLastError("Process wait failed" /* TODO: Add context */); - } - - DWORD ExitCode = 0; - GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode); - - // Gather outputs - - FileContents OutputData = zen::ReadFile(SandboxPath / "build.output"); - - if (OutputData.ErrorCode) - { - throw std::system_error(OutputData.ErrorCode, "Failed to read build output file"); - } - - // TODO: should have a more straightforward way to perform this - ZEN_ASSERT(OutputData.Data.size() == 1); - - CbPackage OutputPackage; - CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]); - - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalRawAttachmentBytes = 0; - - Output.IterateAttachments([&](CbFieldView Field) { - IoHash Hash = Field.AsHash(); - std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; - - FileContents ChunkData = zen::ReadFile(OutputPath); - - if (ChunkData.ErrorCode) - { - throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file"); - } - - ZEN_ASSERT(OutputData.Data.size() == 1); - - CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])); - - if (!AttachmentBuffer) - { - throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); - } - - TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - TotalRawAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - - CbAttachment Attachment(AttachmentBuffer); - OutputPackage.AddAttachment(Attachment); - }); - - OutputPackage.SetObject(Output); - - ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", - OutputPackage.GetAttachments().size(), - NiceBytes(TotalAttachmentBytes), - NiceBytes(TotalRawAttachmentBytes)); - - return OutputPackage; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) -{ - const IoHash WorkerId = Worker.Descriptor.GetHash(); - const IoHash ActionId = Action.GetHash(); - - Action.MakeOwned(); - - ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); - - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); - - if (!EnqueueResult.Success) - { - ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); - return HttpResponseCode::InternalServerError; - } - - CbObjectWriter Writer; - Writer.AddHash("worker", WorkerId); - Writer.AddHash("action", ActionId); - - Object = std::move(Writer.Save()); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) -{ - auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); - if (!Status.Success) - { - // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); - return HttpResponseCode::NotFound; - } - - if (Status.Status.State != UpstreamApplyState::Complete) - { - return HttpResponseCode::Accepted; - } - - GetUpstreamApplyResult& Completed = Status.Status.Result; - if (!Completed.Success || Completed.Error.ErrorCode != 0) - { - ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.StdOut, - Completed.StdErr, - Completed.Error.Reason, - Completed.Error.ErrorCode); - - return HttpResponseCode::InternalServerError; - } - - ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.OutputPackage.GetAttachments().size(), - NiceBytes(Completed.TotalAttachmentBytes), - NiceBytes(Completed.TotalRawAttachmentBytes)); - - Package = std::move(Completed.OutputPackage); - return HttpResponseCode::OK; -} - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES |