aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2022-03-22 11:47:38 -0700
committerGitHub <[email protected]>2022-03-22 11:47:38 -0700
commitcc5adf4cb79c92993fabfe09e75dfadb7d4c9665 (patch)
tree4ba0a18f68e39685fa784d872bbb4bb9ba2b6fd7
parentmove workthreadpool to zencore (#63) (diff)
downloadzen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.tar.xz
zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.zip
Enable Horde compute code on Linux & Mac (#61)
-rw-r--r--xmake.lua9
-rw-r--r--zenserver-test/zenserver-test.cpp10
-rw-r--r--zenserver/compute/apply.cpp992
-rw-r--r--zenserver/compute/function.cpp473
-rw-r--r--zenserver/compute/function.h (renamed from zenserver/compute/apply.h)10
-rw-r--r--zenserver/config.cpp5
-rw-r--r--zenserver/config.h5
-rw-r--r--zenserver/testing/launch.cpp12
-rw-r--r--zenserver/testing/launch.h8
-rw-r--r--zenserver/upstream/upstreamapply.cpp107
-rw-r--r--zenserver/upstream/upstreamapply.h17
-rw-r--r--zenserver/xmake.lua1
-rw-r--r--zenserver/zenserver.cpp53
-rw-r--r--zenutil/zenserverprocess.cpp4
14 files changed, 568 insertions, 1138 deletions
diff --git a/xmake.lua b/xmake.lua
index 13092d7e3..1abe87642 100644
--- a/xmake.lua
+++ b/xmake.lua
@@ -119,12 +119,19 @@ if is_os("windows") then
end
option("compute")
- set_default(is_os("windows"))
+ set_default(true)
set_showmenu(true)
set_description("Enable compute services endpoint")
option_end()
add_define_by_config("ZEN_WITH_COMPUTE_SERVICES", "compute")
+option("exec")
+ set_default(is_os("windows"))
+ set_showmenu(true)
+ set_description("Enable exec services endpoint")
+option_end()
+add_define_by_config("ZEN_WITH_EXEC_SERVICES", "exec")
+
option("zenmesh")
set_default(false)
set_showmenu(true)
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 16f47cd84..a0d97a489 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -58,8 +58,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <asio.hpp>
-#define ZEN_USE_EXEC 0 // Note: this should really be a global define to match the zenserver definition
-
//////////////////////////////////////////////////////////////////////////
#include "projectclient.h"
@@ -2355,7 +2353,7 @@ TEST_CASE("zcache.rpc.allpolicies")
}
}
-# if ZEN_USE_EXEC
+# if ZEN_WITH_EXEC_SERVICES
struct RemoteExecutionRequest
{
@@ -2494,7 +2492,7 @@ private:
TEST_CASE("exec.basic")
{
-# if ZEN_WITH_COMPUTE_SERVICES
+# if ZEN_WITH_EXEC_SERVICES
using namespace std::literals;
std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
@@ -2525,7 +2523,7 @@ TEST_CASE("exec.basic")
CHECK(Result["exitcode"sv].AsInt32(-1) == 1);
}
-# endif // ZEN_WITH_COMPUTE_SERVICES
+# endif // ZEN_WITH_EXEC_SERVICES
}
TEST_CASE("mesh.basic")
@@ -2559,7 +2557,7 @@ TEST_CASE("mesh.basic")
# endif
}
-# endif
+# endif // ZEN_WITH_EXEC_SERVICES
class ZenServerTestHelper
{
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
deleted file mode 100644
index 694e9f662..000000000
--- a/zenserver/compute/apply.cpp
+++ /dev/null
@@ -1,992 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "apply.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <upstream/jupiter.h>
-# include <upstream/upstreamapply.h>
-# include <upstream/upstreamcache.h>
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compress.h>
-# include <zencore/except.h>
-# include <zencore/filesystem.h>
-# include <zencore/fmtutils.h>
-# include <zencore/iobuffer.h>
-# include <zencore/iohash.h>
-# include <zencore/scopeguard.h>
-# include <zenstore/cas.h>
-# include <zenstore/cidstore.h>
-
-# include <zencore/windows.h>
-ZEN_THIRD_PARTY_INCLUDES_START
-# include <AccCtrl.h>
-# include <AclAPI.h>
-# include <UserEnv.h>
-# include <sddl.h>
-# pragma comment(lib, "UserEnv.lib")
-# include <atlbase.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-# include <filesystem>
-# include <span>
-
-using namespace std::literals;
-
-namespace zen {
-
-struct BasicFunctionJob
-{
-public:
- BasicFunctionJob() = default;
- ~BasicFunctionJob();
-
- void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; }
- bool SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine);
- bool Wait(uint32_t TimeoutMs = ~0);
- int ExitCode();
-
-private:
- std::filesystem::path m_WorkingDirectory;
- int m_ProcessId = 0;
- CHandle m_ProcessHandle;
-};
-
-BasicFunctionJob::~BasicFunctionJob()
-{
- Wait();
-}
-
-bool
-BasicFunctionJob::SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine)
-{
- STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)};
- PROCESS_INFORMATION ProcessInfo{};
-
- std::wstring ExePathNative = ExePath.native();
- std::wstring WorkingDirNative = m_WorkingDirectory.native();
-
- BOOL Created = ::CreateProcess(ExePathNative.data() /* ApplicationName */,
- CommandLine.data() /* Command Line */,
- nullptr /* Process Attributes */,
- nullptr /* Security Attributes */,
- FALSE /* InheritHandles */,
- 0 /* Flags */,
- nullptr /* Environment */,
- WorkingDirNative.data() /* Current Directory */,
- (LPSTARTUPINFO)&StartupInfo,
- &ProcessInfo);
-
- if (!Created)
- {
- throw std::system_error(::GetLastError(), std::system_category(), fmt::format("Failed to create process '{}'", ExePath).c_str());
- }
-
- m_ProcessId = ProcessInfo.dwProcessId;
- m_ProcessHandle.Attach(ProcessInfo.hProcess);
- ::CloseHandle(ProcessInfo.hThread);
-
- ZEN_INFO("Created process {}", m_ProcessId);
-
- return true;
-}
-
-bool
-BasicFunctionJob::Wait(uint32_t TimeoutMs)
-{
- if (!m_ProcessHandle)
- {
- return true;
- }
-
- DWORD WaitResult = WaitForSingleObject(m_ProcessHandle, TimeoutMs);
-
- if (WaitResult == WAIT_TIMEOUT)
- {
- return false;
- }
-
- if (WaitResult == WAIT_OBJECT_0)
- {
- return true;
- }
-
- throw std::runtime_error("Failed wait on process handle");
-}
-
-int
-BasicFunctionJob::ExitCode()
-{
- DWORD Ec = 0;
- BOOL Success = GetExitCodeProcess(m_ProcessHandle, &Ec);
-
- if (!Success)
- {
- ZEN_WARN("failed getting exit code");
- }
-
- if (Ec == STILL_ACTIVE)
- {
- ZEN_WARN("getting exit code but process is STILL_ACTIVE");
- }
-
- return gsl::narrow_cast<int>(Ec);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct SandboxedFunctionJob
-{
- SandboxedFunctionJob() = default;
- ~SandboxedFunctionJob() = default;
-
- void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; }
- void Initialize(std::string_view AppContainerId);
- bool SpawnJob(std::filesystem::path ExePath);
- void AddWhitelistFile(const std::filesystem::path& FilePath) { m_WhitelistFiles.push_back(FilePath); }
-
-private:
- bool GrantNamedObjectAccess(PWSTR Name, SE_OBJECT_TYPE Type, ACCESS_MASK AccessMask, bool Recursive);
-
- std::filesystem::path m_WorkingDirectory;
- std::vector<std::filesystem::path> m_WhitelistFiles;
- std::vector<std::wstring> m_WhitelistRegistryKeys;
- PSID m_AppContainerSid = nullptr;
- bool m_IsInitialized = false;
-};
-
-bool
-SandboxedFunctionJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE ObjectType, ACCESS_MASK AccessMask, bool Recursive)
-{
- DWORD Status;
- PACL NewAcl = nullptr;
-
- DWORD grfInhericance = 0;
-
- if (Recursive)
- {
- grfInhericance = OBJECT_INHERIT_ACE | CONTAINER_INHERIT_ACE;
- }
-
- EXPLICIT_ACCESS Access{.grfAccessPermissions = AccessMask,
- .grfAccessMode = GRANT_ACCESS,
- .grfInheritance = grfInhericance,
- .Trustee = {.pMultipleTrustee = nullptr,
- .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE,
- .TrusteeForm = TRUSTEE_IS_SID,
- .TrusteeType = TRUSTEE_IS_GROUP,
- .ptstrName = (PWSTR)m_AppContainerSid}};
-
- PACL OldAcl = nullptr;
-
- Status = GetNamedSecurityInfo(ObjectName /* ObjectName */,
- ObjectType /* ObjectType */,
- DACL_SECURITY_INFORMATION /* SecurityInfo */,
- nullptr /* ppsidOwner */,
- nullptr /* ppsidGroup */,
- &OldAcl /* ppDacl */,
- nullptr /* ppSacl */,
- nullptr /* ppSecurityDescriptor */);
- if (Status != ERROR_SUCCESS)
- return false;
-
- Status = SetEntriesInAcl(1 /* CountOfExplicitEntries */, &Access /* pListOfExplicitEntries */, OldAcl, &NewAcl);
- if (Status != ERROR_SUCCESS)
- return false;
-
- Status = SetNamedSecurityInfo(ObjectName /* ObjectName */,
- ObjectType /* ObjectType */,
- DACL_SECURITY_INFORMATION /*SecurityInfo */,
- nullptr /* psidOwner */,
- nullptr /* psidGroup */,
- NewAcl /* pDacl */,
- nullptr /* pSacl */);
- if (NewAcl)
- ::LocalFree(NewAcl);
-
- return Status == ERROR_SUCCESS;
-}
-
-void
-SandboxedFunctionJob::Initialize(std::string_view AppContainerId)
-{
- if (m_IsInitialized)
- {
- return;
- }
-
- std::wstring ContainerName = zen::Utf8ToWide(AppContainerId);
-
- HRESULT hRes = ::CreateAppContainerProfile(ContainerName.c_str(),
- ContainerName.c_str() /* Display Name */,
- ContainerName.c_str() /* Description */,
- nullptr /* Capabilities */,
- 0 /* Capability Count */,
- &m_AppContainerSid);
-
- if (FAILED(hRes))
- {
- hRes = ::DeriveAppContainerSidFromAppContainerName(ContainerName.c_str(), &m_AppContainerSid);
-
- if (FAILED(hRes))
- {
- ZEN_ERROR("Failed creating app container SID");
- }
- }
-
- // Debugging context
-
- PWSTR Str = nullptr;
- ::ConvertSidToStringSid(m_AppContainerSid, &Str);
-
- ZEN_INFO("AppContainer SID : '{}'", WideToUtf8(Str));
-
- PWSTR Path = nullptr;
- if (SUCCEEDED(::GetAppContainerFolderPath(Str, &Path)))
- {
- ZEN_INFO("AppContainer folder: '{}'", WideToUtf8(Path));
-
- ::CoTaskMemFree(Path);
- }
- ::LocalFree(Str);
-
- m_IsInitialized = true;
-}
-
-bool
-SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath)
-{
- // Build process attributes
-
- SECURITY_CAPABILITIES Sc = {0};
- Sc.AppContainerSid = m_AppContainerSid;
-
- STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)};
- PROCESS_INFORMATION ProcessInfo{};
- SIZE_T Size = 0;
-
- ::InitializeProcThreadAttributeList(nullptr, 1, 0, &Size);
-
- auto AttrBuffer = std::make_unique<uint8_t[]>(Size);
- StartupInfo.lpAttributeList = reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(AttrBuffer.get());
-
- if (!::InitializeProcThreadAttributeList(StartupInfo.lpAttributeList, 1, 0, &Size))
- {
- return false;
- }
-
- if (!::UpdateProcThreadAttribute(StartupInfo.lpAttributeList,
- 0,
- PROC_THREAD_ATTRIBUTE_SECURITY_CAPABILITIES,
- &Sc,
- sizeof Sc,
- nullptr,
- nullptr))
- {
- return false;
- }
-
- // Set up security for files/folders/registry
-
- for (const std::filesystem::path& File : m_WhitelistFiles)
- {
- std::wstring NativeFileName = File.native();
- GrantNamedObjectAccess(NativeFileName.data(), SE_FILE_OBJECT, FILE_ALL_ACCESS, true);
- }
-
- for (std::wstring& RegKey : m_WhitelistRegistryKeys)
- {
- GrantNamedObjectAccess(RegKey.data(), SE_REGISTRY_WOW64_32KEY, KEY_ALL_ACCESS, true);
- }
-
- std::wstring ExePathNative = ExePath.native();
- std::wstring WorkingDirNative = m_WorkingDirectory.native();
-
- BOOL Created = ::CreateProcess(nullptr /* ApplicationName */,
- ExePathNative.data() /* Command line */,
- nullptr /* Process Attributes */,
- nullptr /* Security Attributes */,
- FALSE /* InheritHandles */,
- EXTENDED_STARTUPINFO_PRESENT | CREATE_NEW_CONSOLE /* Flags */,
- nullptr /* Environment */,
- WorkingDirNative.data() /* Current Directory */,
- (LPSTARTUPINFO)&StartupInfo,
- &ProcessInfo);
-
- DeleteProcThreadAttributeList(StartupInfo.lpAttributeList);
-
- if (!Created)
- {
- return false;
- }
-
- ZEN_INFO("Created process {}", ProcessInfo.dwProcessId);
-
- return true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-HttpFunctionService::HttpFunctionService(CasStore& Store,
- CidStore& InCidStore,
- const std::filesystem::path& BaseDir,
- const CloudCacheClientOptions& ComputeOptions,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const UpstreamAuthConfig& StorageAuthConfig,
- AuthMgr& Mgr)
-: m_Log(logging::Get("apply"))
-, m_CasStore(Store)
-, m_CidStore(InCidStore)
-, m_SandboxPath(BaseDir / "scratch")
-, m_FunctionPath(BaseDir / "func")
-{
- m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore);
-
- auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
- ComputeAuthConfig,
- StorageOptions,
- StorageAuthConfig,
- m_CasStore,
- m_CidStore,
- Mgr);
- m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
- m_UpstreamApply->Initialize();
-
- m_Router.AddPattern("job", "([[:digit:]]+)");
- m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
- m_Router.AddPattern("action", "([[:xdigit:]]{40})");
-
- m_Router.RegisterRoute(
- "workers/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- const WorkerDesc& Desc = It->second;
- return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor);
- }
- }
- break;
-
- case HttpVerb::kPost:
- {
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- CbObject FunctionSpec = HttpReq.ReadPayloadObject();
-
- // Determine which pieces are missing and need to be transmitted to populate CAS
-
- CasChunkSet ChunkSet;
-
- FunctionSpec.IterateAttachments([&](CbFieldView Field) {
- const IoHash Hash = Field.AsHash();
- ChunkSet.AddChunkToSet(Hash);
- });
-
- // Note that we store executables uncompressed to make it
- // more straightforward and efficient to materialize them, hence
- // the CAS lookup here instead of CID for the input payloads
-
- m_CasStore.FilterChunks(ChunkSet);
-
- if (ChunkSet.IsEmpty())
- {
- RwLock::ExclusiveLockScope _(m_WorkerLock);
-
- m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
-
- ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
-
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
- else
- {
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("need");
-
- ChunkSet.IterateChunks([&](const IoHash& Hash) {
- ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
-
- ResponseWriter.AddHash(Hash);
- });
-
- ResponseWriter.EndArray();
-
- ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
- }
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
-
- CbObject Obj = FunctionSpec.GetObject();
-
- std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
- SharedBuffer Decompressed = DataView.Decompress();
- const uint64_t DecompressedSize = DataView.GetRawSize();
-
- ZEN_UNUSED(DataHash);
-
- TotalAttachmentBytes += DecompressedSize;
- ++AttachmentCount;
-
- // Note that we store executables uncompressed to make it
- // more straightforward and efficient to materialize them
-
- const CasStore::InsertResult InsertResult =
- m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash()));
-
- if (InsertResult.New)
- {
- TotalNewBytes += DecompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
- WorkerId,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- RwLock::ExclusiveLockScope _(m_WorkerLock);
-
- m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj});
-
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
- break;
-
- default:
- break;
- }
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{job}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- break;
-
- case HttpVerb::kPost:
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{worker}/{action}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
- const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- CbPackage Output;
- HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
- }
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "jobs/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- WorkerDesc Worker;
-
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- Worker = It->second;
- }
- }
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- // TODO: return status of all pending or executing jobs
- break;
-
- case HttpVerb::kPost:
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject RequestObject = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- RequestObject.IterateAttachments([&](CbFieldView Field) {
- const IoHash FileHash = Field.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- NeedList.push_back(FileHash);
- }
- });
-
- if (NeedList.empty())
- {
- // We already have everything
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output);
-
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- std::span<const CbAttachment> Attachments = Action.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
-
- const uint64_t CompressedSize = DataView.GetCompressedSize();
-
- TotalAttachmentBytes += CompressedSize;
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
-
- if (InsertResult.New)
- {
- TotalNewBytes += CompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)",
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output);
-
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
-
- default:
- break;
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kPost);
-}
-
-HttpFunctionService::~HttpFunctionService()
-{
-}
-
-const char*
-HttpFunctionService::BaseUri() const
-{
- return "/apply/";
-}
-
-void
-HttpFunctionService::HandleRequest(HttpServerRequest& Request)
-{
- if (m_Router.HandleRequest(Request) == false)
- {
- ZEN_WARN("No route found for {0}", Request.RelativeUri());
- }
-}
-
-std::filesystem::path
-HttpFunctionService::CreateNewSandbox()
-{
- std::string UniqueId = std::to_string(++m_SandboxCount);
- std::filesystem::path Path = m_SandboxPath / UniqueId;
- zen::CreateDirectories(Path);
- return Path;
-}
-
-CbPackage
-HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
-{
- using namespace std::literals;
-
- std::filesystem::path SandboxPath = CreateNewSandbox();
-
- CbObject Desc = Worker.Descriptor;
-
- // Manifest worker in Sandbox
-
- for (auto& It : Desc["executables"])
- {
- CbObjectView ExecEntry = It.AsObjectView();
-
- std::string_view Name = ExecEntry["name"sv].AsString();
- const IoHash ChunkHash = ExecEntry["hash"sv].AsHash();
- const uint64_t Size = ExecEntry["size"sv].AsUInt64();
-
- std::filesystem::path FilePath{SandboxPath / Name};
- IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash);
-
- if (!DataBuffer)
- {
- throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash));
- }
-
- if (DataBuffer.Size() != Size)
- {
- throw std::runtime_error(
- fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size));
- }
-
- zen::WriteFile(FilePath, DataBuffer);
- }
-
- for (auto& It : Desc["dirs"])
- {
- std::string_view Name = It.AsString();
- std::filesystem::path DirPath{SandboxPath / Name};
- zen::CreateDirectories(DirPath);
- }
-
- for (auto& It : Desc["files"])
- {
- CbObjectView FileEntry = It.AsObjectView();
-
- std::string_view Name = FileEntry["name"sv].AsString();
- const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
- const uint64_t Size = FileEntry["size"sv].AsUInt64();
-
- std::filesystem::path FilePath{SandboxPath / Name};
- IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash);
-
- if (!DataBuffer)
- {
- throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash));
- }
-
- if (DataBuffer.Size() != Size)
- {
- throw std::runtime_error(
- fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size));
- }
-
- zen::WriteFile(FilePath, DataBuffer);
- }
-
- // Write out action
-
- zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer());
-
- // Manifest inputs in sandbox
-
- Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Cid = Field.AsHash();
- std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()};
- IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
-
- if (!DataBuffer)
- {
- throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid));
- }
-
- zen::WriteFile(FilePath, DataBuffer);
- });
-
- // Set up environment variables
-
- StringBuilder<1024> EnvironmentBlock;
-
- for (auto& It : Desc["environment"])
- {
- EnvironmentBlock.Append(It.AsString());
- EnvironmentBlock.Append('\0');
- }
- EnvironmentBlock.Append('\0');
- EnvironmentBlock.Append('\0');
-
- // Execute process
-
- std::string_view ExecPath = Desc["path"].AsString();
- std::filesystem::path ExePath = SandboxPath / ExecPath;
-
- WideStringBuilder<512> CommandLine;
- CommandLine.Append(L'"');
- CommandLine.Append(ExePath.c_str());
- CommandLine.Append(L'"');
- CommandLine.Append(L" -Build=build.action");
-
- LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
- LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
- BOOL bInheritHandles = FALSE;
- DWORD dwCreationFlags = 0;
-
- STARTUPINFO StartupInfo{};
- StartupInfo.cb = sizeof StartupInfo;
-
- PROCESS_INFORMATION ProcessInformation{};
-
- BOOL Success = CreateProcessW(nullptr,
- CommandLine.Data(),
- lpProcessAttributes,
- lpThreadAttributes,
- bInheritHandles,
- dwCreationFlags,
- (LPVOID)EnvironmentBlock.Data(), // Environment block
- SandboxPath.c_str(), // Current directory
- &StartupInfo,
- /* out */ &ProcessInformation);
-
- if (!Success)
- {
- zen::ThrowLastError("Unable to launch process" /* TODO: Add context */);
- }
-
- CloseHandle(ProcessInformation.hThread);
- auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); });
-
- DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE);
-
- if (Result != WAIT_OBJECT_0)
- {
- zen::ThrowLastError("Process wait failed" /* TODO: Add context */);
- }
-
- DWORD ExitCode = 0;
- GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode);
-
- // Gather outputs
-
- FileContents OutputData = zen::ReadFile(SandboxPath / "build.output");
-
- if (OutputData.ErrorCode)
- {
- throw std::system_error(OutputData.ErrorCode, "Failed to read build output file");
- }
-
- // TODO: should have a more straightforward way to perform this
- ZEN_ASSERT(OutputData.Data.size() == 1);
-
- CbPackage OutputPackage;
- CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]);
-
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalRawAttachmentBytes = 0;
-
- Output.IterateAttachments([&](CbFieldView Field) {
- IoHash Hash = Field.AsHash();
- std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
-
- FileContents ChunkData = zen::ReadFile(OutputPath);
-
- if (ChunkData.ErrorCode)
- {
- throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file");
- }
-
- ZEN_ASSERT(OutputData.Data.size() == 1);
-
- CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0]));
-
- if (!AttachmentBuffer)
- {
- throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)");
- }
-
- TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
- TotalRawAttachmentBytes += AttachmentBuffer.GetCompressedSize();
-
- CbAttachment Attachment(AttachmentBuffer);
- OutputPackage.AddAttachment(Attachment);
- });
-
- OutputPackage.SetObject(Output);
-
- ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)",
- OutputPackage.GetAttachments().size(),
- NiceBytes(TotalAttachmentBytes),
- NiceBytes(TotalRawAttachmentBytes));
-
- return OutputPackage;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object)
-{
- const IoHash WorkerId = Worker.Descriptor.GetHash();
- const IoHash ActionId = Action.GetHash();
-
- Action.MakeOwned();
-
- ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
-
- auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)});
-
- if (!EnqueueResult.Success)
- {
- ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString());
- return HttpResponseCode::InternalServerError;
- }
-
- CbObjectWriter Writer;
- Writer.AddHash("worker", WorkerId);
- Writer.AddHash("action", ActionId);
-
- Object = std::move(Writer.Save());
- return HttpResponseCode::OK;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
-{
- auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
- if (!Status.Success)
- {
- // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str());
- return HttpResponseCode::NotFound;
- }
-
- if (Status.Status.State != UpstreamApplyState::Complete)
- {
- return HttpResponseCode::Accepted;
- }
-
- GetUpstreamApplyResult& Completed = Status.Status.Result;
- if (!Completed.Success || Completed.Error.ErrorCode != 0)
- {
- ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
- WorkerId.ToHexString(),
- ActionId.ToHexString(),
- Completed.StdOut,
- Completed.StdErr,
- Completed.Error.Reason,
- Completed.Error.ErrorCode);
-
- return HttpResponseCode::InternalServerError;
- }
-
- ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
- WorkerId.ToHexString(),
- ActionId.ToHexString(),
- Completed.OutputPackage.GetAttachments().size(),
- NiceBytes(Completed.TotalAttachmentBytes),
- NiceBytes(Completed.TotalRawAttachmentBytes));
-
- Package = std::move(Completed.OutputPackage);
- return HttpResponseCode::OK;
-}
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp
new file mode 100644
index 000000000..9af3efcec
--- /dev/null
+++ b/zenserver/compute/function.cpp
@@ -0,0 +1,473 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "function.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <upstream/jupiter.h>
+# include <upstream/upstreamapply.h>
+# include <upstream/upstreamcache.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/scopeguard.h>
+# include <zenstore/cas.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+using namespace std::literals;
+
+namespace zen {
+
+HttpFunctionService::HttpFunctionService(CasStore& Store,
+ CidStore& InCidStore,
+ const CloudCacheClientOptions& ComputeOptions,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ AuthMgr& Mgr)
+: m_Log(logging::Get("apply"))
+, m_CasStore(Store)
+, m_CidStore(InCidStore)
+{
+ m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore);
+
+ auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
+ ComputeAuthConfig,
+ StorageOptions,
+ StorageAuthConfig,
+ m_CasStore,
+ m_CidStore,
+ Mgr);
+ m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
+ m_UpstreamApply->Initialize();
+
+ m_Router.AddPattern("job", "([[:digit:]]+)");
+ m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
+ m_Router.AddPattern("action", "([[:xdigit:]]{40})");
+
+ m_Router.RegisterRoute(
+ "workers/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ const WorkerDesc& Desc = It->second;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor);
+ }
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ CbObject FunctionSpec = HttpReq.ReadPayloadObject();
+
+ // Determine which pieces are missing and need to be transmitted to populate CAS
+
+ CasChunkSet ChunkSet;
+
+ FunctionSpec.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ ChunkSet.AddChunkToSet(Hash);
+ });
+
+ // Note that we store executables uncompressed to make it
+ // more straightforward and efficient to materialize them, hence
+ // the CAS lookup here instead of CID for the input payloads
+
+ m_CasStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
+
+ ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
+
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ else
+ {
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+
+ ChunkSet.IterateChunks([&](const IoHash& Hash) {
+ ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
+
+ ResponseWriter.AddHash(Hash);
+ });
+
+ ResponseWriter.EndArray();
+
+ ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
+ }
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
+
+ CbObject Obj = FunctionSpec.GetObject();
+
+ std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+ SharedBuffer Decompressed = DataView.Decompress();
+ const uint64_t DecompressedSize = DataView.GetRawSize();
+
+ ZEN_UNUSED(DataHash);
+
+ TotalAttachmentBytes += DecompressedSize;
+ ++AttachmentCount;
+
+ // Note that we store executables uncompressed to make it
+ // more straightforward and efficient to materialize them
+
+ const CasStore::InsertResult InsertResult =
+ m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash()));
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += DecompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
+ WorkerId,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj});
+
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{job}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ break;
+
+ case HttpVerb::kPost:
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}/{action}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker;
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ Worker = It->second;
+ }
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ // TODO: return status of all pending or executing jobs
+ break;
+
+ case HttpVerb::kPost:
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject RequestObject = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ RequestObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output);
+
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)",
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output);
+
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+
+ default:
+ break;
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost);
+}
+
+HttpFunctionService::~HttpFunctionService()
+{
+}
+
+const char*
+HttpFunctionService::BaseUri() const
+{
+ return "/apply/";
+}
+
+void
+HttpFunctionService::HandleRequest(HttpServerRequest& Request)
+{
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object)
+{
+ const IoHash WorkerId = Worker.Descriptor.GetHash();
+ const IoHash ActionId = Action.GetHash();
+
+ Action.MakeOwned();
+
+ ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
+
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)});
+
+ if (!EnqueueResult.Success)
+ {
+ ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString());
+ return HttpResponseCode::InternalServerError;
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddHash("worker", WorkerId);
+ Writer.AddHash("action", ActionId);
+
+ Object = Writer.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
+{
+ auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
+ if (!Status.Success)
+ {
+ // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str());
+ return HttpResponseCode::NotFound;
+ }
+
+ if (Status.Status.State != UpstreamApplyState::Complete)
+ {
+ return HttpResponseCode::Accepted;
+ }
+
+ GetUpstreamApplyResult& Completed = Status.Status.Result;
+ if (!Completed.Success || Completed.Error.ErrorCode != 0)
+ {
+ ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.StdOut,
+ Completed.StdErr,
+ Completed.Error.Reason,
+ Completed.Error.ErrorCode);
+
+ return HttpResponseCode::InternalServerError;
+ }
+
+ ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.OutputPackage.GetAttachments().size(),
+ NiceBytes(Completed.TotalAttachmentBytes),
+ NiceBytes(Completed.TotalRawAttachmentBytes));
+
+ Package = std::move(Completed.OutputPackage);
+ return HttpResponseCode::OK;
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/zenserver/compute/apply.h b/zenserver/compute/function.h
index e00afcd61..962927f1c 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/function.h
@@ -37,7 +37,6 @@ class HttpFunctionService : public HttpService
public:
HttpFunctionService(CasStore& Store,
CidStore& InCidStore,
- const std::filesystem::path& BaseDir,
const CloudCacheClientOptions& ComputeOptions,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& ComputeAuthConfig,
@@ -54,9 +53,6 @@ private:
HttpRequestRouter m_Router;
CasStore& m_CasStore;
CidStore& m_CidStore;
- std::filesystem::path m_SandboxPath;
- std::filesystem::path m_FunctionPath;
- std::atomic<int> m_SandboxCount{0};
std::unique_ptr<UpstreamApply> m_UpstreamApply;
struct WorkerDesc
@@ -64,10 +60,8 @@ private:
CbObject Descriptor;
};
- [[nodiscard]] std::filesystem::path CreateNewSandbox();
- [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action);
- [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object);
- [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package);
+ [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object);
+ [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package);
RwLock m_WorkerLock;
std::unordered_map<IoHash, WorkerDesc> m_WorkerMap;
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index adb079d83..b7fc18b4e 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -652,6 +652,11 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
}
}
+ if (sol::optional<sol::table> ExecConfig = lua["exec"])
+ {
+ ServerOptions.ExecServiceEnabled = ExecConfig->get_or("enable", ServerOptions.ExecServiceEnabled);
+ }
+
if (sol::optional<sol::table> ComputeConfig = lua["compute"])
{
ServerOptions.ComputeServiceEnabled = ComputeConfig->get_or("enable", ServerOptions.ComputeServiceEnabled);
diff --git a/zenserver/config.h b/zenserver/config.h
index a7a7815a8..a61a7f89f 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -15,10 +15,6 @@
# define ZEN_USE_NAMED_PIPES 0
#endif
-#ifndef ZEN_USE_EXEC
-# define ZEN_USE_EXEC 0
-#endif
-
struct ZenUpstreamJupiterConfig
{
std::string Name;
@@ -120,6 +116,7 @@ struct ZenServerOptions
bool IsTest = false;
bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
bool StructuredCacheEnabled = true;
+ bool ExecServiceEnabled = true;
bool ComputeServiceEnabled = true;
bool ShouldCrash = false; // Option for testing crash handling
bool IsFirstRun = false;
diff --git a/zenserver/testing/launch.cpp b/zenserver/testing/launch.cpp
index f315ec1b4..1236e6adb 100644
--- a/zenserver/testing/launch.cpp
+++ b/zenserver/testing/launch.cpp
@@ -2,7 +2,7 @@
#include "launch.h"
-#if ZEN_WITH_COMPUTE_SERVICES
+#if ZEN_WITH_EXEC_SERVICES
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
@@ -167,10 +167,10 @@ SandboxedJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE ObjectType
.grfAccessMode = GRANT_ACCESS,
.grfInheritance = grfInhericance,
.Trustee = {.pMultipleTrustee = nullptr,
- .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE,
- .TrusteeForm = TRUSTEE_IS_SID,
- .TrusteeType = TRUSTEE_IS_GROUP,
- .ptstrName = (PWSTR)m_AppContainerSid}};
+ .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE,
+ .TrusteeForm = TRUSTEE_IS_SID,
+ .TrusteeType = TRUSTEE_IS_GROUP,
+ .ptstrName = (PWSTR)m_AppContainerSid}};
PACL OldAcl = nullptr;
@@ -548,4 +548,4 @@ HttpLaunchService::CreateNewSandbox()
} // namespace zen
-#endif // ZEN_WITH_COMPUTE_SERVICES
+#endif // ZEN_WITH_EXEC_SERVICES
diff --git a/zenserver/testing/launch.h b/zenserver/testing/launch.h
index 925fa18b0..6fd3e39ae 100644
--- a/zenserver/testing/launch.h
+++ b/zenserver/testing/launch.h
@@ -4,11 +4,11 @@
#include <zencore/zencore.h>
-#if !defined(ZEN_WITH_COMPUTE_SERVICES)
-# define ZEN_WITH_COMPUTE_SERVICES ZEN_PLATFORM_WINDOWS
+#if !defined(ZEN_WITH_EXEC_SERVICES)
+# define ZEN_WITH_EXEC_SERVICES ZEN_PLATFORM_WINDOWS
#endif
-#if ZEN_WITH_COMPUTE_SERVICES
+#if ZEN_WITH_EXEC_SERVICES
# include <zencore/logging.h>
# include <zenhttp/httpserver.h>
@@ -45,4 +45,4 @@ private:
} // namespace zen
-#endif // ZEN_WITH_COMPUTE_SERVICES
+#endif // ZEN_WITH_EXEC_SERVICES
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 918697224..fd304adb8 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -59,9 +59,9 @@ namespace detail {
CidStore& CidStore,
AuthMgr& Mgr)
: m_Log(logging::Get("upstream-apply"))
- , m_AuthMgr(Mgr)
, m_CasStore(CasStore)
, m_CidStore(CidStore)
+ , m_AuthMgr(Mgr)
{
m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl);
m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString());
@@ -434,9 +434,6 @@ namespace detail {
CbObjectView Status = It.AsObjectView();
const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
- const std::string_view AgentId = TaskStatus["a"sv].AsString();
- const std::string_view LeaseId = TaskStatus["l"sv].AsString();
-
// Only care about completed tasks
if (State != ComputeTaskState::Complete)
{
@@ -486,10 +483,10 @@ namespace detail {
private:
spdlog::logger& Log() { return m_Log; }
+ spdlog::logger& m_Log;
CasStore& m_CasStore;
CidStore& m_CidStore;
AuthMgr& m_AuthMgr;
- spdlog::logger& m_Log;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
RefPtr<CloudCacheClient> m_StorageClient;
@@ -532,11 +529,7 @@ namespace detail {
return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}};
}
- const IoHash TaskId = TaskStatus["h"sv].AsHash();
- const DateTime Time = TaskStatus["t"sv].AsDateTime();
- const IoHash ResultHash = TaskStatus["r"sv].AsHash();
- const std::string_view AgentId = TaskStatus["a"sv].AsString();
- const std::string_view LeaseId = TaskStatus["l"sv].AsString();
+ const IoHash ResultHash = TaskStatus["r"sv].AsHash();
int64_t Bytes{};
double ElapsedSeconds{};
@@ -828,7 +821,7 @@ namespace detail {
{
std::string_view Env = It.AsString();
auto Index = Env.find('=');
- if (Index < 0)
+ if (Index == std::string_view::npos)
{
Log().warn("process apply upstream FAILED, environment '{}' malformed", Env);
return false;
@@ -910,7 +903,7 @@ namespace detail {
}
if (Memory > 0)
{
- Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL);
+ Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL);
}
CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive);
@@ -1063,7 +1056,7 @@ namespace detail {
DirectoryTreeWriter.EndArray();
}
- return std::move(DirectoryTreeWriter.Save());
+ return DirectoryTreeWriter.Save();
}
[[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
@@ -1085,7 +1078,7 @@ namespace detail {
Writer.EndArray();
}
Writer.AddBool("e", Exclusive);
- return std::move(Writer.Save());
+ return Writer.Save();
}
[[nodiscard]] CbObject BuildTask(const std::string_view Executable,
@@ -1141,7 +1134,7 @@ namespace detail {
TaskWriter.EndArray();
}
- return std::move(TaskWriter.Save());
+ return TaskWriter.Save();
}
};
} // namespace detail
@@ -1154,46 +1147,33 @@ struct UpstreamApplyStats
UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {}
- void Add(spdlog::logger& Logger,
- UpstreamApplyEndpoint& Endpoint,
- const PostUpstreamApplyResult& Result,
- const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result)
{
UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
if (Result.Error)
{
- Stats.ErrorCount++;
+ Stats.ErrorCount.Increment(1);
}
else if (Result.Success)
{
- Stats.PostCount++;
- Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
- Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
- }
-
- if (m_Enabled && m_SampleCount++ % MaxSampleCount)
- {
- Dump(Logger, Endpoints);
+ Stats.PostCount.Increment(1);
+ Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024);
}
}
- void Add(spdlog::logger& Logger,
- UpstreamApplyEndpoint& Endpoint,
- const GetUpstreamApplyUpdatesResult& Result,
- const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result)
{
UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
if (Result.Error)
{
- Stats.ErrorCount++;
+ Stats.ErrorCount.Increment(1);
}
else if (Result.Success)
{
- Stats.UpdateCount++;
- Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
- Stats.SecondsDown.fetch_add(Result.ElapsedSeconds);
+ Stats.UpdateCount.Increment(1);
+ Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024);
if (!Result.Completed.empty())
{
uint64_t Completed = 0;
@@ -1201,47 +1181,12 @@ struct UpstreamApplyStats
{
Completed += It.second.size();
}
- Stats.CompleteCount.fetch_add(Completed);
+ Stats.CompleteCount.Increment(Completed);
}
}
-
- if (m_Enabled && m_SampleCount++ % MaxSampleCount)
- {
- Dump(Logger, Endpoints);
- }
- }
-
- void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
- {
- for (auto& Ep : Endpoints)
- {
- // These stats will not be totally correct as the numbers are not captured atomically
-
- UpstreamApplyEndpointStats& Stats = Ep->Stats();
- const uint64_t PostCount = Stats.PostCount;
- const uint64_t CompleteCount = Stats.CompleteCount;
- // const uint64_t UpdateCount = Stats.UpdateCount;
- const double DownBytes = Stats.DownBytes;
- const double SecondsDown = Stats.SecondsDown;
- const double UpBytes = Stats.UpBytes;
- const double SecondsUp = Stats.SecondsUp;
-
- const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0;
- const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0;
- const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
-
- Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
- CompleteRate,
- DownBytes,
- DownSpeed,
- UpBytes,
- UpSpeed);
- }
}
- bool m_Enabled;
- std::atomic_uint64_t m_SampleCount = {};
+ bool m_Enabled;
};
//////////////////////////////////////////////////////////////////////////
@@ -1364,19 +1309,19 @@ public:
Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
UpstreamApplyEndpointStats& Stats = Ep->Stats();
- const uint64_t PostCount = Stats.PostCount;
- const uint64_t CompleteCount = Stats.CompleteCount;
+ const uint64_t PostCount = Stats.PostCount.Value();
+ const uint64_t CompleteCount = Stats.CompleteCount.Value();
// const uint64_t UpdateCount = Stats.UpdateCount;
const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
Status << "post_count" << PostCount;
Status << "complete_count" << PostCount;
- Status << "update_count" << Stats.UpdateCount;
+ Status << "update_count" << Stats.UpdateCount.Value();
Status << "complete_ratio" << CompleteRate;
- Status << "downloaded_mb" << Stats.DownBytes;
- Status << "uploaded_mb" << Stats.UpBytes;
- Status << "error_count" << Stats.ErrorCount;
+ Status << "downloaded_mb" << Stats.DownBytes.Value();
+ Status << "uploaded_mb" << Stats.UpBytes.Value();
+ Status << "error_count" << Stats.ErrorCount.Value();
Status.EndObject();
}
@@ -1425,7 +1370,7 @@ private:
}
}
}
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ m_Stats.Add(*Endpoint, Result);
return;
}
}
@@ -1476,7 +1421,7 @@ private:
if (Endpoint->IsHealthy())
{
GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates();
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ m_Stats.Add(*Endpoint, Result);
if (!Result.Success)
{
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
index 44c08e30e..63fd771da 100644
--- a/zenserver/upstream/upstreamapply.h
+++ b/zenserver/upstream/upstreamapply.h
@@ -2,13 +2,12 @@
#pragma once
-#include "compute/apply.h"
-
#if ZEN_WITH_COMPUTE_SERVICES
# include <zencore/compactbinarypackage.h>
# include <zencore/iobuffer.h>
# include <zencore/iohash.h>
+# include <zencore/stats.h>
# include <zencore/zencore.h>
# include <atomic>
@@ -106,14 +105,12 @@ struct UpstreamEndpointHealth
struct UpstreamApplyEndpointStats
{
- std::atomic_uint64_t PostCount{};
- std::atomic_uint64_t CompleteCount{};
- std::atomic_uint64_t UpdateCount{};
- std::atomic_uint64_t ErrorCount{};
- std::atomic<double> UpBytes{};
- std::atomic<double> DownBytes{};
- std::atomic<double> SecondsUp{};
- std::atomic<double> SecondsDown{};
+ metrics::Counter PostCount;
+ metrics::Counter CompleteCount;
+ metrics::Counter UpdateCount;
+ metrics::Counter ErrorCount;
+ metrics::Counter UpBytes;
+ metrics::Counter DownBytes;
};
/**
diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua
index 569e3c150..2174ad679 100644
--- a/zenserver/xmake.lua
+++ b/zenserver/xmake.lua
@@ -32,6 +32,7 @@ target("zenserver")
add_options("vfs")
add_options("compute")
+ add_options("exec")
add_packages(
"vcpkg::asio",
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index d8e97b117..c9708dba8 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -104,7 +104,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "auth/authservice.h"
#include "cache/structuredcache.h"
#include "cache/structuredcachestore.h"
-#include "compute/apply.h"
+#include "compute/function.h"
#include "diag/diagsvcs.h"
#include "experimental/usnjournal.h"
#include "frontend/frontend.h"
@@ -272,21 +272,26 @@ public:
m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore);
#endif
-#if ZEN_USE_EXEC
- std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
- zen::CreateDirectories(SandboxDir);
- m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
-#endif
+#if ZEN_WITH_EXEC_SERVICES
-#if ZEN_WITH_COMPUTE_SERVICES
- if (ServerOptions.ComputeServiceEnabled)
+ if (ServerOptions.ExecServiceEnabled)
{
- ZEN_INFO("instantiating compute services");
+ ZEN_INFO("instantiating exec service");
std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
zen::CreateDirectories(SandboxDir);
m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
+ }
+ else
+ {
+ ZEN_INFO("NOT instantiating exec services");
+ }
+#endif // ZEN_WITH_EXEC_SERVICES
+
+#if ZEN_WITH_COMPUTE_SERVICES
+ if (ServerOptions.ComputeServiceEnabled)
+ {
InitializeCompute(ServerOptions);
}
else
@@ -331,11 +336,18 @@ public:
m_Http->RegisterService(m_CasService);
+#if ZEN_WITH_EXEC_SERVICES
+ if (ServerOptions.ExecServiceEnabled)
+ {
+ if (m_HttpLaunchService != nullptr)
+ {
+ m_Http->RegisterService(*m_HttpLaunchService);
+ }
+ }
+#endif // ZEN_WITH_EXEC_SERVICES
#if ZEN_WITH_COMPUTE_SERVICES
if (ServerOptions.ComputeServiceEnabled)
{
- m_Http->RegisterService(*m_HttpLaunchService);
-
if (m_HttpFunctionService != nullptr)
{
m_Http->RegisterService(*m_HttpFunctionService);
@@ -365,9 +377,7 @@ public:
void InitializeState(const ZenServerOptions& ServerOptions);
void InitializeStructuredCache(const ZenServerOptions& ServerOptions);
-#if ZEN_WITH_COMPUTE_SERVICES
void InitializeCompute(const ZenServerOptions& ServerOptions);
-#endif
#if ZEN_ENABLE_MESH
void StartMesh(int BasePort)
@@ -610,16 +620,14 @@ private:
zen::HttpAdminService m_AdminService{m_GcScheduler};
zen::HttpHealthService m_HealthService;
zen::MeshTracker m_ZenMesh{m_IoContext};
+#if ZEN_WITH_EXEC_SERVICES
+ std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
+#endif // ZEN_WITH_EXEC_SERVICES
#if ZEN_WITH_COMPUTE_SERVICES
- std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
-#endif
+#endif // ZEN_WITH_COMPUTE_SERVICES
std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
-#if ZEN_USE_EXEC
- std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
-#endif
-
#if ZEN_USE_NAMED_PIPES
zen::Ref<zen::LocalProjectService> m_LocalProjectService;
#endif
@@ -837,7 +845,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
// Horde compute upstream
- if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.Url.empty() == false)
+ if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.JupiterConfig.Url.empty() == false)
{
std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name;
@@ -868,11 +876,8 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
.OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider,
.AccessToken = UpstreamConfig.JupiterConfig.AccessToken};
- std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply";
- zen::CreateDirectories(ApplySandboxDir);
m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore,
*m_CidStore,
- ApplySandboxDir,
ComputeOptions,
StorageOptions,
ComputeAuthConfig,
@@ -880,7 +885,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
*m_AuthMgr);
}
}
-#endif
+#endif // ZEN_WITH_COMPUTE_SERVICES
////////////////////////////////////////////////////////////////////////////////
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index f49d5f6d8..3a4957b76 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -550,8 +550,8 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr
const std::filesystem::path BaseDir = m_Env.ProgramBaseDir();
const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
CreateProcOptions CreateOptions = {
- .WorkingDirectory = &CurrentDirectory,
- .Flags = CreationFlags,
+ .WorkingDirectory = &CurrentDirectory,
+ .Flags = CreationFlags,
};
CreateProcResult ChildPid = CreateProc(Executable, CommandLine.ToView(), CreateOptions);
#if ZEN_PLATFORM_WINDOWS