diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-22 11:47:38 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-22 11:47:38 -0700 |
| commit | cc5adf4cb79c92993fabfe09e75dfadb7d4c9665 (patch) | |
| tree | 4ba0a18f68e39685fa784d872bbb4bb9ba2b6fd7 | |
| parent | move workthreadpool to zencore (#63) (diff) | |
| download | zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.tar.xz zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.zip | |
Enable Horde compute code on Linux & Mac (#61)
| -rw-r--r-- | xmake.lua | 9 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 10 | ||||
| -rw-r--r-- | zenserver/compute/apply.cpp | 992 | ||||
| -rw-r--r-- | zenserver/compute/function.cpp | 473 | ||||
| -rw-r--r-- | zenserver/compute/function.h (renamed from zenserver/compute/apply.h) | 10 | ||||
| -rw-r--r-- | zenserver/config.cpp | 5 | ||||
| -rw-r--r-- | zenserver/config.h | 5 | ||||
| -rw-r--r-- | zenserver/testing/launch.cpp | 12 | ||||
| -rw-r--r-- | zenserver/testing/launch.h | 8 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 107 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 17 | ||||
| -rw-r--r-- | zenserver/xmake.lua | 1 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 53 | ||||
| -rw-r--r-- | zenutil/zenserverprocess.cpp | 4 |
14 files changed, 568 insertions, 1138 deletions
@@ -119,12 +119,19 @@ if is_os("windows") then end option("compute") - set_default(is_os("windows")) + set_default(true) set_showmenu(true) set_description("Enable compute services endpoint") option_end() add_define_by_config("ZEN_WITH_COMPUTE_SERVICES", "compute") +option("exec") + set_default(is_os("windows")) + set_showmenu(true) + set_description("Enable exec services endpoint") +option_end() +add_define_by_config("ZEN_WITH_EXEC_SERVICES", "exec") + option("zenmesh") set_default(false) set_showmenu(true) diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 16f47cd84..a0d97a489 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -58,8 +58,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <asio.hpp> -#define ZEN_USE_EXEC 0 // Note: this should really be a global define to match the zenserver definition - ////////////////////////////////////////////////////////////////////////// #include "projectclient.h" @@ -2355,7 +2353,7 @@ TEST_CASE("zcache.rpc.allpolicies") } } -# if ZEN_USE_EXEC +# if ZEN_WITH_EXEC_SERVICES struct RemoteExecutionRequest { @@ -2494,7 +2492,7 @@ private: TEST_CASE("exec.basic") { -# if ZEN_WITH_COMPUTE_SERVICES +# if ZEN_WITH_EXEC_SERVICES using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); @@ -2525,7 +2523,7 @@ TEST_CASE("exec.basic") CHECK(Result["exitcode"sv].AsInt32(-1) == 1); } -# endif // ZEN_WITH_COMPUTE_SERVICES +# endif // ZEN_WITH_EXEC_SERVICES } TEST_CASE("mesh.basic") @@ -2559,7 +2557,7 @@ TEST_CASE("mesh.basic") # endif } -# endif +# endif // ZEN_WITH_EXEC_SERVICES class ZenServerTestHelper { diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp deleted file mode 100644 index 694e9f662..000000000 --- a/zenserver/compute/apply.cpp +++ /dev/null @@ -1,992 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "apply.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <upstream/jupiter.h> -# include <upstream/upstreamapply.h> -# include <upstream/upstreamcache.h> -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compress.h> -# include <zencore/except.h> -# include <zencore/filesystem.h> -# include <zencore/fmtutils.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/scopeguard.h> -# include <zenstore/cas.h> -# include <zenstore/cidstore.h> - -# include <zencore/windows.h> -ZEN_THIRD_PARTY_INCLUDES_START -# include <AccCtrl.h> -# include <AclAPI.h> -# include <UserEnv.h> -# include <sddl.h> -# pragma comment(lib, "UserEnv.lib") -# include <atlbase.h> -ZEN_THIRD_PARTY_INCLUDES_END - -# include <filesystem> -# include <span> - -using namespace std::literals; - -namespace zen { - -struct BasicFunctionJob -{ -public: - BasicFunctionJob() = default; - ~BasicFunctionJob(); - - void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; } - bool SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine); - bool Wait(uint32_t TimeoutMs = ~0); - int ExitCode(); - -private: - std::filesystem::path m_WorkingDirectory; - int m_ProcessId = 0; - CHandle m_ProcessHandle; -}; - -BasicFunctionJob::~BasicFunctionJob() -{ - Wait(); -} - -bool -BasicFunctionJob::SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine) -{ - STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)}; - PROCESS_INFORMATION ProcessInfo{}; - - std::wstring ExePathNative = ExePath.native(); - std::wstring WorkingDirNative = m_WorkingDirectory.native(); - - BOOL Created = ::CreateProcess(ExePathNative.data() /* ApplicationName */, - CommandLine.data() /* Command Line */, - nullptr /* Process Attributes */, - nullptr /* Security Attributes */, - FALSE /* InheritHandles */, - 0 /* Flags */, - nullptr /* Environment */, - WorkingDirNative.data() /* Current Directory */, - (LPSTARTUPINFO)&StartupInfo, - &ProcessInfo); - - if (!Created) - { - throw std::system_error(::GetLastError(), std::system_category(), fmt::format("Failed to create process '{}'", ExePath).c_str()); - } - - m_ProcessId = ProcessInfo.dwProcessId; - m_ProcessHandle.Attach(ProcessInfo.hProcess); - ::CloseHandle(ProcessInfo.hThread); - - ZEN_INFO("Created process {}", m_ProcessId); - - return true; -} - -bool -BasicFunctionJob::Wait(uint32_t TimeoutMs) -{ - if (!m_ProcessHandle) - { - return true; - } - - DWORD WaitResult = WaitForSingleObject(m_ProcessHandle, TimeoutMs); - - if (WaitResult == WAIT_TIMEOUT) - { - return false; - } - - if (WaitResult == WAIT_OBJECT_0) - { - return true; - } - - throw std::runtime_error("Failed wait on process handle"); -} - -int -BasicFunctionJob::ExitCode() -{ - DWORD Ec = 0; - BOOL Success = GetExitCodeProcess(m_ProcessHandle, &Ec); - - if (!Success) - { - ZEN_WARN("failed getting exit code"); - } - - if (Ec == STILL_ACTIVE) - { - ZEN_WARN("getting exit code but process is STILL_ACTIVE"); - } - - return gsl::narrow_cast<int>(Ec); -} - -//////////////////////////////////////////////////////////////////////////////// - -struct SandboxedFunctionJob -{ - SandboxedFunctionJob() = default; - ~SandboxedFunctionJob() = default; - - void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; } - void Initialize(std::string_view AppContainerId); - bool SpawnJob(std::filesystem::path ExePath); - void AddWhitelistFile(const std::filesystem::path& FilePath) { m_WhitelistFiles.push_back(FilePath); } - -private: - bool GrantNamedObjectAccess(PWSTR Name, SE_OBJECT_TYPE Type, ACCESS_MASK AccessMask, bool Recursive); - - std::filesystem::path m_WorkingDirectory; - std::vector<std::filesystem::path> m_WhitelistFiles; - std::vector<std::wstring> m_WhitelistRegistryKeys; - PSID m_AppContainerSid = nullptr; - bool m_IsInitialized = false; -}; - -bool -SandboxedFunctionJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE ObjectType, ACCESS_MASK AccessMask, bool Recursive) -{ - DWORD Status; - PACL NewAcl = nullptr; - - DWORD grfInhericance = 0; - - if (Recursive) - { - grfInhericance = OBJECT_INHERIT_ACE | CONTAINER_INHERIT_ACE; - } - - EXPLICIT_ACCESS Access{.grfAccessPermissions = AccessMask, - .grfAccessMode = GRANT_ACCESS, - .grfInheritance = grfInhericance, - .Trustee = {.pMultipleTrustee = nullptr, - .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE, - .TrusteeForm = TRUSTEE_IS_SID, - .TrusteeType = TRUSTEE_IS_GROUP, - .ptstrName = (PWSTR)m_AppContainerSid}}; - - PACL OldAcl = nullptr; - - Status = GetNamedSecurityInfo(ObjectName /* ObjectName */, - ObjectType /* ObjectType */, - DACL_SECURITY_INFORMATION /* SecurityInfo */, - nullptr /* ppsidOwner */, - nullptr /* ppsidGroup */, - &OldAcl /* ppDacl */, - nullptr /* ppSacl */, - nullptr /* ppSecurityDescriptor */); - if (Status != ERROR_SUCCESS) - return false; - - Status = SetEntriesInAcl(1 /* CountOfExplicitEntries */, &Access /* pListOfExplicitEntries */, OldAcl, &NewAcl); - if (Status != ERROR_SUCCESS) - return false; - - Status = SetNamedSecurityInfo(ObjectName /* ObjectName */, - ObjectType /* ObjectType */, - DACL_SECURITY_INFORMATION /*SecurityInfo */, - nullptr /* psidOwner */, - nullptr /* psidGroup */, - NewAcl /* pDacl */, - nullptr /* pSacl */); - if (NewAcl) - ::LocalFree(NewAcl); - - return Status == ERROR_SUCCESS; -} - -void -SandboxedFunctionJob::Initialize(std::string_view AppContainerId) -{ - if (m_IsInitialized) - { - return; - } - - std::wstring ContainerName = zen::Utf8ToWide(AppContainerId); - - HRESULT hRes = ::CreateAppContainerProfile(ContainerName.c_str(), - ContainerName.c_str() /* Display Name */, - ContainerName.c_str() /* Description */, - nullptr /* Capabilities */, - 0 /* Capability Count */, - &m_AppContainerSid); - - if (FAILED(hRes)) - { - hRes = ::DeriveAppContainerSidFromAppContainerName(ContainerName.c_str(), &m_AppContainerSid); - - if (FAILED(hRes)) - { - ZEN_ERROR("Failed creating app container SID"); - } - } - - // Debugging context - - PWSTR Str = nullptr; - ::ConvertSidToStringSid(m_AppContainerSid, &Str); - - ZEN_INFO("AppContainer SID : '{}'", WideToUtf8(Str)); - - PWSTR Path = nullptr; - if (SUCCEEDED(::GetAppContainerFolderPath(Str, &Path))) - { - ZEN_INFO("AppContainer folder: '{}'", WideToUtf8(Path)); - - ::CoTaskMemFree(Path); - } - ::LocalFree(Str); - - m_IsInitialized = true; -} - -bool -SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath) -{ - // Build process attributes - - SECURITY_CAPABILITIES Sc = {0}; - Sc.AppContainerSid = m_AppContainerSid; - - STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)}; - PROCESS_INFORMATION ProcessInfo{}; - SIZE_T Size = 0; - - ::InitializeProcThreadAttributeList(nullptr, 1, 0, &Size); - - auto AttrBuffer = std::make_unique<uint8_t[]>(Size); - StartupInfo.lpAttributeList = reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(AttrBuffer.get()); - - if (!::InitializeProcThreadAttributeList(StartupInfo.lpAttributeList, 1, 0, &Size)) - { - return false; - } - - if (!::UpdateProcThreadAttribute(StartupInfo.lpAttributeList, - 0, - PROC_THREAD_ATTRIBUTE_SECURITY_CAPABILITIES, - &Sc, - sizeof Sc, - nullptr, - nullptr)) - { - return false; - } - - // Set up security for files/folders/registry - - for (const std::filesystem::path& File : m_WhitelistFiles) - { - std::wstring NativeFileName = File.native(); - GrantNamedObjectAccess(NativeFileName.data(), SE_FILE_OBJECT, FILE_ALL_ACCESS, true); - } - - for (std::wstring& RegKey : m_WhitelistRegistryKeys) - { - GrantNamedObjectAccess(RegKey.data(), SE_REGISTRY_WOW64_32KEY, KEY_ALL_ACCESS, true); - } - - std::wstring ExePathNative = ExePath.native(); - std::wstring WorkingDirNative = m_WorkingDirectory.native(); - - BOOL Created = ::CreateProcess(nullptr /* ApplicationName */, - ExePathNative.data() /* Command line */, - nullptr /* Process Attributes */, - nullptr /* Security Attributes */, - FALSE /* InheritHandles */, - EXTENDED_STARTUPINFO_PRESENT | CREATE_NEW_CONSOLE /* Flags */, - nullptr /* Environment */, - WorkingDirNative.data() /* Current Directory */, - (LPSTARTUPINFO)&StartupInfo, - &ProcessInfo); - - DeleteProcThreadAttributeList(StartupInfo.lpAttributeList); - - if (!Created) - { - return false; - } - - ZEN_INFO("Created process {}", ProcessInfo.dwProcessId); - - return true; -} - -//////////////////////////////////////////////////////////////////////////////// - -HttpFunctionService::HttpFunctionService(CasStore& Store, - CidStore& InCidStore, - const std::filesystem::path& BaseDir, - const CloudCacheClientOptions& ComputeOptions, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const UpstreamAuthConfig& StorageAuthConfig, - AuthMgr& Mgr) -: m_Log(logging::Get("apply")) -, m_CasStore(Store) -, m_CidStore(InCidStore) -, m_SandboxPath(BaseDir / "scratch") -, m_FunctionPath(BaseDir / "func") -{ - m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); - - auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, - ComputeAuthConfig, - StorageOptions, - StorageAuthConfig, - m_CasStore, - m_CidStore, - Mgr); - m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); - m_UpstreamApply->Initialize(); - - m_Router.AddPattern("job", "([[:digit:]]+)"); - m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); - m_Router.AddPattern("action", "([[:xdigit:]]{40})"); - - m_Router.RegisterRoute( - "workers/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - const WorkerDesc& Desc = It->second; - return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); - } - } - break; - - case HttpVerb::kPost: - { - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - CbObject FunctionSpec = HttpReq.ReadPayloadObject(); - - // Determine which pieces are missing and need to be transmitted to populate CAS - - CasChunkSet ChunkSet; - - FunctionSpec.IterateAttachments([&](CbFieldView Field) { - const IoHash Hash = Field.AsHash(); - ChunkSet.AddChunkToSet(Hash); - }); - - // Note that we store executables uncompressed to make it - // more straightforward and efficient to materialize them, hence - // the CAS lookup here instead of CID for the input payloads - - m_CasStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); - - ZEN_DEBUG("worker {}: all attachments already available", WorkerId); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - else - { - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("need"); - - ChunkSet.IterateChunks([&](const IoHash& Hash) { - ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); - - ResponseWriter.AddHash(Hash); - }); - - ResponseWriter.EndArray(); - - ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); - } - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); - - CbObject Obj = FunctionSpec.GetObject(); - - std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - SharedBuffer Decompressed = DataView.Decompress(); - const uint64_t DecompressedSize = DataView.GetRawSize(); - - ZEN_UNUSED(DataHash); - - TotalAttachmentBytes += DecompressedSize; - ++AttachmentCount; - - // Note that we store executables uncompressed to make it - // more straightforward and efficient to materialize them - - const CasStore::InsertResult InsertResult = - m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); - - if (InsertResult.New) - { - TotalNewBytes += DecompressedSize; - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", - WorkerId, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - break; - - default: - break; - } - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{job}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - break; - - case HttpVerb::kPost: - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{worker}/{action}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - CbPackage Output; - HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - } - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "jobs/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - WorkerDesc Worker; - - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - Worker = It->second; - } - } - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - // TODO: return status of all pending or executing jobs - break; - - case HttpVerb::kPost: - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - RequestObject.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - default: - break; - } - break; - - default: - break; - } - }, - HttpVerb::kPost); -} - -HttpFunctionService::~HttpFunctionService() -{ -} - -const char* -HttpFunctionService::BaseUri() const -{ - return "/apply/"; -} - -void -HttpFunctionService::HandleRequest(HttpServerRequest& Request) -{ - if (m_Router.HandleRequest(Request) == false) - { - ZEN_WARN("No route found for {0}", Request.RelativeUri()); - } -} - -std::filesystem::path -HttpFunctionService::CreateNewSandbox() -{ - std::string UniqueId = std::to_string(++m_SandboxCount); - std::filesystem::path Path = m_SandboxPath / UniqueId; - zen::CreateDirectories(Path); - return Path; -} - -CbPackage -HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) -{ - using namespace std::literals; - - std::filesystem::path SandboxPath = CreateNewSandbox(); - - CbObject Desc = Worker.Descriptor; - - // Manifest worker in Sandbox - - for (auto& It : Desc["executables"]) - { - CbObjectView ExecEntry = It.AsObjectView(); - - std::string_view Name = ExecEntry["name"sv].AsString(); - const IoHash ChunkHash = ExecEntry["hash"sv].AsHash(); - const uint64_t Size = ExecEntry["size"sv].AsUInt64(); - - std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash)); - } - - if (DataBuffer.Size() != Size) - { - throw std::runtime_error( - fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); - } - - zen::WriteFile(FilePath, DataBuffer); - } - - for (auto& It : Desc["dirs"]) - { - std::string_view Name = It.AsString(); - std::filesystem::path DirPath{SandboxPath / Name}; - zen::CreateDirectories(DirPath); - } - - for (auto& It : Desc["files"]) - { - CbObjectView FileEntry = It.AsObjectView(); - - std::string_view Name = FileEntry["name"sv].AsString(); - const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); - - std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash)); - } - - if (DataBuffer.Size() != Size) - { - throw std::runtime_error( - fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); - } - - zen::WriteFile(FilePath, DataBuffer); - } - - // Write out action - - zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer()); - - // Manifest inputs in sandbox - - Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()}; - IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); - - if (!DataBuffer) - { - throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); - } - - zen::WriteFile(FilePath, DataBuffer); - }); - - // Set up environment variables - - StringBuilder<1024> EnvironmentBlock; - - for (auto& It : Desc["environment"]) - { - EnvironmentBlock.Append(It.AsString()); - EnvironmentBlock.Append('\0'); - } - EnvironmentBlock.Append('\0'); - EnvironmentBlock.Append('\0'); - - // Execute process - - std::string_view ExecPath = Desc["path"].AsString(); - std::filesystem::path ExePath = SandboxPath / ExecPath; - - WideStringBuilder<512> CommandLine; - CommandLine.Append(L'"'); - CommandLine.Append(ExePath.c_str()); - CommandLine.Append(L'"'); - CommandLine.Append(L" -Build=build.action"); - - LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; - LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; - BOOL bInheritHandles = FALSE; - DWORD dwCreationFlags = 0; - - STARTUPINFO StartupInfo{}; - StartupInfo.cb = sizeof StartupInfo; - - PROCESS_INFORMATION ProcessInformation{}; - - BOOL Success = CreateProcessW(nullptr, - CommandLine.Data(), - lpProcessAttributes, - lpThreadAttributes, - bInheritHandles, - dwCreationFlags, - (LPVOID)EnvironmentBlock.Data(), // Environment block - SandboxPath.c_str(), // Current directory - &StartupInfo, - /* out */ &ProcessInformation); - - if (!Success) - { - zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); - } - - CloseHandle(ProcessInformation.hThread); - auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); }); - - DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE); - - if (Result != WAIT_OBJECT_0) - { - zen::ThrowLastError("Process wait failed" /* TODO: Add context */); - } - - DWORD ExitCode = 0; - GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode); - - // Gather outputs - - FileContents OutputData = zen::ReadFile(SandboxPath / "build.output"); - - if (OutputData.ErrorCode) - { - throw std::system_error(OutputData.ErrorCode, "Failed to read build output file"); - } - - // TODO: should have a more straightforward way to perform this - ZEN_ASSERT(OutputData.Data.size() == 1); - - CbPackage OutputPackage; - CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]); - - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalRawAttachmentBytes = 0; - - Output.IterateAttachments([&](CbFieldView Field) { - IoHash Hash = Field.AsHash(); - std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; - - FileContents ChunkData = zen::ReadFile(OutputPath); - - if (ChunkData.ErrorCode) - { - throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file"); - } - - ZEN_ASSERT(OutputData.Data.size() == 1); - - CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])); - - if (!AttachmentBuffer) - { - throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); - } - - TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - TotalRawAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - - CbAttachment Attachment(AttachmentBuffer); - OutputPackage.AddAttachment(Attachment); - }); - - OutputPackage.SetObject(Output); - - ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", - OutputPackage.GetAttachments().size(), - NiceBytes(TotalAttachmentBytes), - NiceBytes(TotalRawAttachmentBytes)); - - return OutputPackage; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) -{ - const IoHash WorkerId = Worker.Descriptor.GetHash(); - const IoHash ActionId = Action.GetHash(); - - Action.MakeOwned(); - - ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); - - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); - - if (!EnqueueResult.Success) - { - ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); - return HttpResponseCode::InternalServerError; - } - - CbObjectWriter Writer; - Writer.AddHash("worker", WorkerId); - Writer.AddHash("action", ActionId); - - Object = std::move(Writer.Save()); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) -{ - auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); - if (!Status.Success) - { - // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); - return HttpResponseCode::NotFound; - } - - if (Status.Status.State != UpstreamApplyState::Complete) - { - return HttpResponseCode::Accepted; - } - - GetUpstreamApplyResult& Completed = Status.Status.Result; - if (!Completed.Success || Completed.Error.ErrorCode != 0) - { - ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.StdOut, - Completed.StdErr, - Completed.Error.Reason, - Completed.Error.ErrorCode); - - return HttpResponseCode::InternalServerError; - } - - ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.OutputPackage.GetAttachments().size(), - NiceBytes(Completed.TotalAttachmentBytes), - NiceBytes(Completed.TotalRawAttachmentBytes)); - - Package = std::move(Completed.OutputPackage); - return HttpResponseCode::OK; -} - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp new file mode 100644 index 000000000..9af3efcec --- /dev/null +++ b/zenserver/compute/function.cpp @@ -0,0 +1,473 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "function.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <upstream/jupiter.h> +# include <upstream/upstreamapply.h> +# include <upstream/upstreamcache.h> +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/scopeguard.h> +# include <zenstore/cas.h> +# include <zenstore/cidstore.h> + +# include <span> + +using namespace std::literals; + +namespace zen { + +HttpFunctionService::HttpFunctionService(CasStore& Store, + CidStore& InCidStore, + const CloudCacheClientOptions& ComputeOptions, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const UpstreamAuthConfig& StorageAuthConfig, + AuthMgr& Mgr) +: m_Log(logging::Get("apply")) +, m_CasStore(Store) +, m_CidStore(InCidStore) +{ + m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); + + auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + m_CasStore, + m_CidStore, + Mgr); + m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); + m_UpstreamApply->Initialize(); + + m_Router.AddPattern("job", "([[:digit:]]+)"); + m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); + m_Router.AddPattern("action", "([[:xdigit:]]{40})"); + + m_Router.RegisterRoute( + "workers/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + else + { + const WorkerDesc& Desc = It->second; + return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); + } + } + break; + + case HttpVerb::kPost: + { + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + CbObject FunctionSpec = HttpReq.ReadPayloadObject(); + + // Determine which pieces are missing and need to be transmitted to populate CAS + + CasChunkSet ChunkSet; + + FunctionSpec.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + ChunkSet.AddChunkToSet(Hash); + }); + + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them, hence + // the CAS lookup here instead of CID for the input payloads + + m_CasStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + RwLock::ExclusiveLockScope _(m_WorkerLock); + + m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); + + ZEN_DEBUG("worker {}: all attachments already available", WorkerId); + + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + else + { + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + ChunkSet.IterateChunks([&](const IoHash& Hash) { + ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); + + ResponseWriter.AddHash(Hash); + }); + + ResponseWriter.EndArray(); + + ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); + } + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); + + CbObject Obj = FunctionSpec.GetObject(); + + std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + SharedBuffer Decompressed = DataView.Decompress(); + const uint64_t DecompressedSize = DataView.GetRawSize(); + + ZEN_UNUSED(DataHash); + + TotalAttachmentBytes += DecompressedSize; + ++AttachmentCount; + + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them + + const CasStore::InsertResult InsertResult = + m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); + + if (InsertResult.New) + { + TotalNewBytes += DecompressedSize; + ++NewAttachmentCount; + } + } + + ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", + WorkerId, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + RwLock::ExclusiveLockScope _(m_WorkerLock); + + m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); + + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/{job}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + break; + + case HttpVerb::kPost: + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/{worker}/{action}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker; + + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + else + { + Worker = It->second; + } + } + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + // TODO: return status of all pending or executing jobs + break; + + case HttpVerb::kPost: + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject RequestObject = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + RequestObject.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); + + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); + + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + + default: + break; + } + break; + + default: + break; + } + }, + HttpVerb::kPost); +} + +HttpFunctionService::~HttpFunctionService() +{ +} + +const char* +HttpFunctionService::BaseUri() const +{ + return "/apply/"; +} + +void +HttpFunctionService::HandleRequest(HttpServerRequest& Request) +{ + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + const IoHash ActionId = Action.GetHash(); + + Action.MakeOwned(); + + ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + + if (!EnqueueResult.Success) + { + ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); + return HttpResponseCode::InternalServerError; + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + Writer.AddHash("action", ActionId); + + Object = Writer.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) +{ + auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); + if (!Status.Success) + { + // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + if (!Completed.Success || Completed.Error.ErrorCode != 0) + { + ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason, + Completed.Error.ErrorCode); + + return HttpResponseCode::InternalServerError; + } + + ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.OutputPackage.GetAttachments().size(), + NiceBytes(Completed.TotalAttachmentBytes), + NiceBytes(Completed.TotalRawAttachmentBytes)); + + Package = std::move(Completed.OutputPackage); + return HttpResponseCode::OK; +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenserver/compute/apply.h b/zenserver/compute/function.h index e00afcd61..962927f1c 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/function.h @@ -37,7 +37,6 @@ class HttpFunctionService : public HttpService public: HttpFunctionService(CasStore& Store, CidStore& InCidStore, - const std::filesystem::path& BaseDir, const CloudCacheClientOptions& ComputeOptions, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& ComputeAuthConfig, @@ -54,9 +53,6 @@ private: HttpRequestRouter m_Router; CasStore& m_CasStore; CidStore& m_CidStore; - std::filesystem::path m_SandboxPath; - std::filesystem::path m_FunctionPath; - std::atomic<int> m_SandboxCount{0}; std::unique_ptr<UpstreamApply> m_UpstreamApply; struct WorkerDesc @@ -64,10 +60,8 @@ private: CbObject Descriptor; }; - [[nodiscard]] std::filesystem::path CreateNewSandbox(); - [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action); - [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object); - [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package); + [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object); + [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package); RwLock m_WorkerLock; std::unordered_map<IoHash, WorkerDesc> m_WorkerMap; diff --git a/zenserver/config.cpp b/zenserver/config.cpp index adb079d83..b7fc18b4e 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -652,6 +652,11 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio } } + if (sol::optional<sol::table> ExecConfig = lua["exec"]) + { + ServerOptions.ExecServiceEnabled = ExecConfig->get_or("enable", ServerOptions.ExecServiceEnabled); + } + if (sol::optional<sol::table> ComputeConfig = lua["compute"]) { ServerOptions.ComputeServiceEnabled = ComputeConfig->get_or("enable", ServerOptions.ComputeServiceEnabled); diff --git a/zenserver/config.h b/zenserver/config.h index a7a7815a8..a61a7f89f 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -15,10 +15,6 @@ # define ZEN_USE_NAMED_PIPES 0 #endif -#ifndef ZEN_USE_EXEC -# define ZEN_USE_EXEC 0 -#endif - struct ZenUpstreamJupiterConfig { std::string Name; @@ -120,6 +116,7 @@ struct ZenServerOptions bool IsTest = false; bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements bool StructuredCacheEnabled = true; + bool ExecServiceEnabled = true; bool ComputeServiceEnabled = true; bool ShouldCrash = false; // Option for testing crash handling bool IsFirstRun = false; diff --git a/zenserver/testing/launch.cpp b/zenserver/testing/launch.cpp index f315ec1b4..1236e6adb 100644 --- a/zenserver/testing/launch.cpp +++ b/zenserver/testing/launch.cpp @@ -2,7 +2,7 @@ #include "launch.h" -#if ZEN_WITH_COMPUTE_SERVICES +#if ZEN_WITH_EXEC_SERVICES # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> @@ -167,10 +167,10 @@ SandboxedJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE ObjectType .grfAccessMode = GRANT_ACCESS, .grfInheritance = grfInhericance, .Trustee = {.pMultipleTrustee = nullptr, - .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE, - .TrusteeForm = TRUSTEE_IS_SID, - .TrusteeType = TRUSTEE_IS_GROUP, - .ptstrName = (PWSTR)m_AppContainerSid}}; + .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE, + .TrusteeForm = TRUSTEE_IS_SID, + .TrusteeType = TRUSTEE_IS_GROUP, + .ptstrName = (PWSTR)m_AppContainerSid}}; PACL OldAcl = nullptr; @@ -548,4 +548,4 @@ HttpLaunchService::CreateNewSandbox() } // namespace zen -#endif // ZEN_WITH_COMPUTE_SERVICES +#endif // ZEN_WITH_EXEC_SERVICES diff --git a/zenserver/testing/launch.h b/zenserver/testing/launch.h index 925fa18b0..6fd3e39ae 100644 --- a/zenserver/testing/launch.h +++ b/zenserver/testing/launch.h @@ -4,11 +4,11 @@ #include <zencore/zencore.h> -#if !defined(ZEN_WITH_COMPUTE_SERVICES) -# define ZEN_WITH_COMPUTE_SERVICES ZEN_PLATFORM_WINDOWS +#if !defined(ZEN_WITH_EXEC_SERVICES) +# define ZEN_WITH_EXEC_SERVICES ZEN_PLATFORM_WINDOWS #endif -#if ZEN_WITH_COMPUTE_SERVICES +#if ZEN_WITH_EXEC_SERVICES # include <zencore/logging.h> # include <zenhttp/httpserver.h> @@ -45,4 +45,4 @@ private: } // namespace zen -#endif // ZEN_WITH_COMPUTE_SERVICES +#endif // ZEN_WITH_EXEC_SERVICES diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 918697224..fd304adb8 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -59,9 +59,9 @@ namespace detail { CidStore& CidStore, AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) - , m_AuthMgr(Mgr) , m_CasStore(CasStore) , m_CidStore(CidStore) + , m_AuthMgr(Mgr) { m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); @@ -434,9 +434,6 @@ namespace detail { CbObjectView Status = It.AsObjectView(); const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); - const std::string_view AgentId = TaskStatus["a"sv].AsString(); - const std::string_view LeaseId = TaskStatus["l"sv].AsString(); - // Only care about completed tasks if (State != ComputeTaskState::Complete) { @@ -486,10 +483,10 @@ namespace detail { private: spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; CasStore& m_CasStore; CidStore& m_CidStore; AuthMgr& m_AuthMgr; - spdlog::logger& m_Log; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; RefPtr<CloudCacheClient> m_StorageClient; @@ -532,11 +529,7 @@ namespace detail { return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}}; } - const IoHash TaskId = TaskStatus["h"sv].AsHash(); - const DateTime Time = TaskStatus["t"sv].AsDateTime(); - const IoHash ResultHash = TaskStatus["r"sv].AsHash(); - const std::string_view AgentId = TaskStatus["a"sv].AsString(); - const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + const IoHash ResultHash = TaskStatus["r"sv].AsHash(); int64_t Bytes{}; double ElapsedSeconds{}; @@ -828,7 +821,7 @@ namespace detail { { std::string_view Env = It.AsString(); auto Index = Env.find('='); - if (Index < 0) + if (Index == std::string_view::npos) { Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); return false; @@ -910,7 +903,7 @@ namespace detail { } if (Memory > 0) { - Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL); + Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL); } CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); @@ -1063,7 +1056,7 @@ namespace detail { DirectoryTreeWriter.EndArray(); } - return std::move(DirectoryTreeWriter.Save()); + return DirectoryTreeWriter.Save(); } [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, @@ -1085,7 +1078,7 @@ namespace detail { Writer.EndArray(); } Writer.AddBool("e", Exclusive); - return std::move(Writer.Save()); + return Writer.Save(); } [[nodiscard]] CbObject BuildTask(const std::string_view Executable, @@ -1141,7 +1134,7 @@ namespace detail { TaskWriter.EndArray(); } - return std::move(TaskWriter.Save()); + return TaskWriter.Save(); } }; } // namespace detail @@ -1154,46 +1147,33 @@ struct UpstreamApplyStats UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} - void Add(spdlog::logger& Logger, - UpstreamApplyEndpoint& Endpoint, - const PostUpstreamApplyResult& Result, - const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { - Stats.ErrorCount++; + Stats.ErrorCount.Increment(1); } else if (Result.Success) { - Stats.PostCount++; - Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); - Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); - } - - if (m_Enabled && m_SampleCount++ % MaxSampleCount) - { - Dump(Logger, Endpoints); + Stats.PostCount.Increment(1); + Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024); } } - void Add(spdlog::logger& Logger, - UpstreamApplyEndpoint& Endpoint, - const GetUpstreamApplyUpdatesResult& Result, - const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { - Stats.ErrorCount++; + Stats.ErrorCount.Increment(1); } else if (Result.Success) { - Stats.UpdateCount++; - Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); - Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); + Stats.UpdateCount.Increment(1); + Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024); if (!Result.Completed.empty()) { uint64_t Completed = 0; @@ -1201,47 +1181,12 @@ struct UpstreamApplyStats { Completed += It.second.size(); } - Stats.CompleteCount.fetch_add(Completed); + Stats.CompleteCount.Increment(Completed); } } - - if (m_Enabled && m_SampleCount++ % MaxSampleCount) - { - Dump(Logger, Endpoints); - } - } - - void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) - { - for (auto& Ep : Endpoints) - { - // These stats will not be totally correct as the numbers are not captured atomically - - UpstreamApplyEndpointStats& Stats = Ep->Stats(); - const uint64_t PostCount = Stats.PostCount; - const uint64_t CompleteCount = Stats.CompleteCount; - // const uint64_t UpdateCount = Stats.UpdateCount; - const double DownBytes = Stats.DownBytes; - const double SecondsDown = Stats.SecondsDown; - const double UpBytes = Stats.UpBytes; - const double SecondsUp = Stats.SecondsUp; - - const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; - const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; - const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; - - Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - CompleteRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); - } } - bool m_Enabled; - std::atomic_uint64_t m_SampleCount = {}; + bool m_Enabled; }; ////////////////////////////////////////////////////////////////////////// @@ -1364,19 +1309,19 @@ public: Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamApplyEndpointStats& Stats = Ep->Stats(); - const uint64_t PostCount = Stats.PostCount; - const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t PostCount = Stats.PostCount.Value(); + const uint64_t CompleteCount = Stats.CompleteCount.Value(); // const uint64_t UpdateCount = Stats.UpdateCount; const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; Status << "post_count" << PostCount; Status << "complete_count" << PostCount; - Status << "update_count" << Stats.UpdateCount; + Status << "update_count" << Stats.UpdateCount.Value(); Status << "complete_ratio" << CompleteRate; - Status << "downloaded_mb" << Stats.DownBytes; - Status << "uploaded_mb" << Stats.UpBytes; - Status << "error_count" << Stats.ErrorCount; + Status << "downloaded_mb" << Stats.DownBytes.Value(); + Status << "uploaded_mb" << Stats.UpBytes.Value(); + Status << "error_count" << Stats.ErrorCount.Value(); Status.EndObject(); } @@ -1425,7 +1370,7 @@ private: } } } - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + m_Stats.Add(*Endpoint, Result); return; } } @@ -1476,7 +1421,7 @@ private: if (Endpoint->IsHealthy()) { GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(); - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + m_Stats.Add(*Endpoint, Result); if (!Result.Success) { diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h index 44c08e30e..63fd771da 100644 --- a/zenserver/upstream/upstreamapply.h +++ b/zenserver/upstream/upstreamapply.h @@ -2,13 +2,12 @@ #pragma once -#include "compute/apply.h" - #if ZEN_WITH_COMPUTE_SERVICES # include <zencore/compactbinarypackage.h> # include <zencore/iobuffer.h> # include <zencore/iohash.h> +# include <zencore/stats.h> # include <zencore/zencore.h> # include <atomic> @@ -106,14 +105,12 @@ struct UpstreamEndpointHealth struct UpstreamApplyEndpointStats { - std::atomic_uint64_t PostCount{}; - std::atomic_uint64_t CompleteCount{}; - std::atomic_uint64_t UpdateCount{}; - std::atomic_uint64_t ErrorCount{}; - std::atomic<double> UpBytes{}; - std::atomic<double> DownBytes{}; - std::atomic<double> SecondsUp{}; - std::atomic<double> SecondsDown{}; + metrics::Counter PostCount; + metrics::Counter CompleteCount; + metrics::Counter UpdateCount; + metrics::Counter ErrorCount; + metrics::Counter UpBytes; + metrics::Counter DownBytes; }; /** diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua index 569e3c150..2174ad679 100644 --- a/zenserver/xmake.lua +++ b/zenserver/xmake.lua @@ -32,6 +32,7 @@ target("zenserver") add_options("vfs") add_options("compute") + add_options("exec") add_packages( "vcpkg::asio", diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index d8e97b117..c9708dba8 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -104,7 +104,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "auth/authservice.h" #include "cache/structuredcache.h" #include "cache/structuredcachestore.h" -#include "compute/apply.h" +#include "compute/function.h" #include "diag/diagsvcs.h" #include "experimental/usnjournal.h" #include "frontend/frontend.h" @@ -272,21 +272,26 @@ public: m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); #endif -#if ZEN_USE_EXEC - std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; - zen::CreateDirectories(SandboxDir); - m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); -#endif +#if ZEN_WITH_EXEC_SERVICES -#if ZEN_WITH_COMPUTE_SERVICES - if (ServerOptions.ComputeServiceEnabled) + if (ServerOptions.ExecServiceEnabled) { - ZEN_INFO("instantiating compute services"); + ZEN_INFO("instantiating exec service"); std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; zen::CreateDirectories(SandboxDir); m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); + } + else + { + ZEN_INFO("NOT instantiating exec services"); + } +#endif // ZEN_WITH_EXEC_SERVICES + +#if ZEN_WITH_COMPUTE_SERVICES + if (ServerOptions.ComputeServiceEnabled) + { InitializeCompute(ServerOptions); } else @@ -331,11 +336,18 @@ public: m_Http->RegisterService(m_CasService); +#if ZEN_WITH_EXEC_SERVICES + if (ServerOptions.ExecServiceEnabled) + { + if (m_HttpLaunchService != nullptr) + { + m_Http->RegisterService(*m_HttpLaunchService); + } + } +#endif // ZEN_WITH_EXEC_SERVICES #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeServiceEnabled) { - m_Http->RegisterService(*m_HttpLaunchService); - if (m_HttpFunctionService != nullptr) { m_Http->RegisterService(*m_HttpFunctionService); @@ -365,9 +377,7 @@ public: void InitializeState(const ZenServerOptions& ServerOptions); void InitializeStructuredCache(const ZenServerOptions& ServerOptions); -#if ZEN_WITH_COMPUTE_SERVICES void InitializeCompute(const ZenServerOptions& ServerOptions); -#endif #if ZEN_ENABLE_MESH void StartMesh(int BasePort) @@ -610,16 +620,14 @@ private: zen::HttpAdminService m_AdminService{m_GcScheduler}; zen::HttpHealthService m_HealthService; zen::MeshTracker m_ZenMesh{m_IoContext}; +#if ZEN_WITH_EXEC_SERVICES + std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; +#endif // ZEN_WITH_EXEC_SERVICES #if ZEN_WITH_COMPUTE_SERVICES - std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; -#endif +#endif // ZEN_WITH_COMPUTE_SERVICES std::unique_ptr<zen::HttpFrontendService> m_FrontendService; -#if ZEN_USE_EXEC - std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; -#endif - #if ZEN_USE_NAMED_PIPES zen::Ref<zen::LocalProjectService> m_LocalProjectService; #endif @@ -837,7 +845,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; // Horde compute upstream - if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.Url.empty() == false) + if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.JupiterConfig.Url.empty() == false) { std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name; @@ -868,11 +876,8 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; - std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; - zen::CreateDirectories(ApplySandboxDir); m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, *m_CidStore, - ApplySandboxDir, ComputeOptions, StorageOptions, ComputeAuthConfig, @@ -880,7 +885,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) *m_AuthMgr); } } -#endif +#endif // ZEN_WITH_COMPUTE_SERVICES //////////////////////////////////////////////////////////////////////////////// diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index f49d5f6d8..3a4957b76 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -550,8 +550,8 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr const std::filesystem::path BaseDir = m_Env.ProgramBaseDir(); const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; CreateProcOptions CreateOptions = { - .WorkingDirectory = &CurrentDirectory, - .Flags = CreationFlags, + .WorkingDirectory = &CurrentDirectory, + .Flags = CreationFlags, }; CreateProcResult ChildPid = CreateProc(Executable, CommandLine.ToView(), CreateOptions); #if ZEN_PLATFORM_WINDOWS |