aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute
diff options
context:
space:
mode:
authorzousar <[email protected]>2026-02-18 23:19:14 -0700
committerzousar <[email protected]>2026-02-18 23:19:14 -0700
commit2ba28acaf034722452f82cfb07afc0a4bb90eeab (patch)
treec00dea385597180673be6e02aca6c07d9ef6ec00 /src/zenserver/compute
parentupdatefrontend (diff)
parentstructured compute basics (#714) (diff)
downloadzen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.tar.xz
zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.zip
Merge branch 'main' into zs/web-ui-improvements
Diffstat (limited to 'src/zenserver/compute')
-rw-r--r--src/zenserver/compute/computeserver.cpp330
-rw-r--r--src/zenserver/compute/computeserver.h106
-rw-r--r--src/zenserver/compute/computeservice.cpp100
-rw-r--r--src/zenserver/compute/computeservice.h36
4 files changed, 572 insertions, 0 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
new file mode 100644
index 000000000..173f56386
--- /dev/null
+++ b/src/zenserver/compute/computeserver.cpp
@@ -0,0 +1,330 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "computeserver.h"
+#include <zencompute/httpfunctionservice.h>
+#include "computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/fmtutils.h>
+# include <zencore/memory/llm.h>
+# include <zencore/memory/memorytrace.h>
+# include <zencore/memory/tagtrace.h>
+# include <zencore/scopeguard.h>
+# include <zencore/sentryintegration.h>
+# include <zencore/system.h>
+# include <zencore/windows.h>
+# include <zenhttp/httpapiservice.h>
+# include <zenstore/cidstore.h>
+# include <zenutil/service.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <cxxopts.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+void
+ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
+{
+ Options.add_option("compute",
+ "",
+ "upstream-notification-endpoint",
+ "Endpoint URL for upstream notifications",
+ cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "instance-id",
+ "Instance ID for use in notifications",
+ cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
+ "");
+}
+
+void
+ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
+{
+ ZEN_UNUSED(LuaOptions);
+}
+
+void
+ZenComputeServerConfigurator::ValidateOptions()
+{
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+ZenComputeServer::ZenComputeServer()
+{
+}
+
+ZenComputeServer::~ZenComputeServer()
+{
+ Cleanup();
+}
+
+int
+ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry)
+{
+ ZEN_TRACE_CPU("ZenComputeServer::Initialize");
+ ZEN_MEMSCOPE(GetZenserverTag());
+
+ ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode");
+
+ const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry);
+ if (EffectiveBasePort < 0)
+ {
+ return EffectiveBasePort;
+ }
+
+ // This is a workaround to make sure we can have automated tests. Without
+ // this the ranges for different child zen hub processes could overlap with
+ // the main test range.
+ ZenServerEnvironment::SetBaseChildId(1000);
+
+ m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
+
+ InitializeState(ServerConfig);
+ InitializeServices(ServerConfig);
+ RegisterServices(ServerConfig);
+
+ ZenServerBase::Finalize();
+
+ return EffectiveBasePort;
+}
+
+void
+ZenComputeServer::Cleanup()
+{
+ ZEN_TRACE_CPU("ZenStorageServer::Cleanup");
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ try
+ {
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
+ {
+ m_IoRunner.join();
+ }
+
+ if (m_Http)
+ {
+ m_Http->Close();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
+ }
+}
+
+void
+ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+}
+
+void
+ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_INFO("initializing storage");
+
+ CidStoreConfiguration Config;
+ Config.RootDirectory = m_DataRoot / "cas";
+
+ m_CidStore = std::make_unique<CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+
+ ZEN_INFO("instantiating API service");
+ m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
+
+ ZEN_INFO("instantiating compute service");
+ m_ComputeService = std::make_unique<HttpComputeService>(ServerConfig.DataDir / "compute");
+
+ // Ref<zen::compute::FunctionRunner> Runner;
+ // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner");
+
+ // TODO: (re)implement default configuration here
+
+ ZEN_INFO("instantiating function service");
+ m_FunctionService =
+ std::make_unique<zen::compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions");
+}
+
+void
+ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+
+ if (m_ComputeService)
+ {
+ m_Http->RegisterService(*m_ComputeService);
+ }
+
+ if (m_ApiService)
+ {
+ m_Http->RegisterService(*m_ApiService);
+ }
+
+ if (m_FunctionService)
+ {
+ m_Http->RegisterService(*m_FunctionService);
+ }
+}
+
+void
+ZenComputeServer::Run()
+{
+ if (m_ProcessMonitor.IsActive())
+ {
+ CheckOwnerPid();
+ }
+
+ if (!m_TestMode)
+ {
+ // clang-format off
+ ZEN_INFO( R"(__________ _________ __ )" "\n"
+ R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n"
+ R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n"
+ R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n"
+ R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n"
+ R"( \/ \/ \/ \/ \/|__| \/ )");
+ // clang-format on
+
+ ExtendableStringBuilder<256> BuildOptions;
+ GetBuildOptions(BuildOptions, '\n');
+ ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions);
+ }
+
+ ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId());
+
+# if ZEN_PLATFORM_WINDOWS
+ if (zen::windows::IsRunningOnWine())
+ {
+ ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well");
+ }
+# endif
+
+# if ZEN_USE_SENTRY
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
+ {
+ SentryIntegration::ClearCaches();
+ }
+# endif
+
+ if (m_DebugOptionForcedCrash)
+ {
+ ZEN_DEBUG_BREAK();
+ }
+
+ const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode;
+
+ SetNewState(kRunning);
+
+ OnReady();
+
+ m_Http->Run(IsInteractiveMode);
+
+ SetNewState(kShuttingDown);
+
+ ZEN_INFO(ZEN_APP_NAME " exiting");
+}
+
+//////////////////////////////////////////////////////////////////////////////////
+
+ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions)
+: ZenServerMain(ServerOptions)
+, m_ServerOptions(ServerOptions)
+{
+}
+
+void
+ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry)
+{
+ ZenComputeServer Server;
+ Server.SetDataRoot(m_ServerOptions.DataDir);
+ Server.SetContentRoot(m_ServerOptions.ContentDir);
+ Server.SetTestMode(m_ServerOptions.IsTest);
+ Server.SetDedicatedMode(m_ServerOptions.IsDedicated);
+
+ const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry);
+ if (EffectiveBasePort == -1)
+ {
+ // Server.Initialize has already logged what the issue is - just exit with failure code here.
+ std::exit(1);
+ }
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != m_ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ m_ServerOptions.BasePort = EffectiveBasePort;
+ }
+
+ std::unique_ptr<std::thread> ShutdownThread;
+ std::unique_ptr<NamedEvent> ShutdownEvent;
+
+ ExtendableStringBuilder<64> ShutdownEventName;
+ ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown";
+ ShutdownEvent.reset(new NamedEvent{ShutdownEventName});
+
+ // Monitor shutdown signals
+
+ ShutdownThread.reset(new std::thread{[&] {
+ SetCurrentThreadName("shutdown_mon");
+
+ ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId());
+
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId());
+ Server.RequestExit(0);
+ }
+ else
+ {
+ ZEN_INFO("shutdown signal wait() failed");
+ }
+ }});
+
+ auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] {
+ ReportServiceStatus(ServiceStatus::Stopping);
+
+ if (ShutdownEvent)
+ {
+ ShutdownEvent->Set();
+ }
+ if (ShutdownThread && ShutdownThread->joinable())
+ {
+ ShutdownThread->join();
+ }
+ });
+
+ // If we have a parent process, establish the mechanisms we need
+ // to be able to communicate readiness with the parent
+
+ Server.SetIsReadyFunc([&] {
+ std::error_code Ec;
+ m_LockFile.Update(MakeLockData(true), Ec);
+ ReportServiceStatus(ServiceStatus::Running);
+ NotifyReady();
+ });
+
+ Server.Run();
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h
new file mode 100644
index 000000000..625140b23
--- /dev/null
+++ b/src/zenserver/compute/computeserver.h
@@ -0,0 +1,106 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenserver.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenstore/gc.h>
+
+namespace cxxopts {
+class Options;
+}
+namespace zen::LuaConfig {
+struct Options;
+}
+
+namespace zen::compute {
+class HttpFunctionService;
+}
+
+namespace zen {
+
+class CidStore;
+class HttpApiService;
+class HttpComputeService;
+
+struct ZenComputeServerConfig : public ZenServerConfig
+{
+ std::string UpstreamNotificationEndpoint;
+ std::string InstanceId; // For use in notifications
+};
+
+struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase
+{
+ ZenComputeServerConfigurator(ZenComputeServerConfig& ServerOptions)
+ : ZenServerConfiguratorBase(ServerOptions)
+ , m_ServerOptions(ServerOptions)
+ {
+ }
+
+ ~ZenComputeServerConfigurator() = default;
+
+private:
+ virtual void AddCliOptions(cxxopts::Options& Options) override;
+ virtual void AddConfigOptions(LuaConfig::Options& Options) override;
+ virtual void ApplyOptions(cxxopts::Options& Options) override;
+ virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override;
+ virtual void ValidateOptions() override;
+
+ ZenComputeServerConfig& m_ServerOptions;
+};
+
+class ZenComputeServerMain : public ZenServerMain
+{
+public:
+ ZenComputeServerMain(ZenComputeServerConfig& ServerOptions);
+ virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override;
+
+ ZenComputeServerMain(const ZenComputeServerMain&) = delete;
+ ZenComputeServerMain& operator=(const ZenComputeServerMain&) = delete;
+
+ typedef ZenComputeServerConfig Config;
+ typedef ZenComputeServerConfigurator Configurator;
+
+private:
+ ZenComputeServerConfig& m_ServerOptions;
+};
+
+/**
+ * The compute server handles DDC build function execution requests
+ * only. It's intended to be used on a pure compute resource and does
+ * not handle any storage tasks. The actual scheduling happens upstream
+ * in a storage server instance.
+ */
+
+class ZenComputeServer : public ZenServerBase
+{
+ ZenComputeServer& operator=(ZenComputeServer&&) = delete;
+ ZenComputeServer(ZenComputeServer&&) = delete;
+
+public:
+ ZenComputeServer();
+ ~ZenComputeServer();
+
+ int Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry);
+ void Run();
+ void Cleanup();
+
+private:
+ HttpStatsService m_StatsService;
+ GcManager m_GcManager;
+ GcScheduler m_GcScheduler{m_GcManager};
+ std::unique_ptr<CidStore> m_CidStore;
+ std::unique_ptr<HttpComputeService> m_ComputeService;
+ std::unique_ptr<HttpApiService> m_ApiService;
+ std::unique_ptr<zen::compute::HttpFunctionService> m_FunctionService;
+
+ void InitializeState(const ZenComputeServerConfig& ServerConfig);
+ void InitializeServices(const ZenComputeServerConfig& ServerConfig);
+ void RegisterServices(const ZenComputeServerConfig& ServerConfig);
+};
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/compute/computeservice.cpp b/src/zenserver/compute/computeservice.cpp
new file mode 100644
index 000000000..2c0bc0ae9
--- /dev/null
+++ b/src/zenserver/compute/computeservice.cpp
@@ -0,0 +1,100 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/logging.h>
+# include <zencore/system.h>
+# include <zenutil/zenserverprocess.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <EASTL/fixed_vector.h>
+# include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+# include <unordered_map>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct ResourceMetrics
+{
+ uint64_t DiskUsageBytes = 0;
+ uint64_t MemoryUsageBytes = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct HttpComputeService::Impl
+{
+ Impl(const Impl&) = delete;
+ Impl& operator=(const Impl&) = delete;
+
+ Impl();
+ ~Impl();
+
+ void Initialize(std::filesystem::path BaseDir) { ZEN_UNUSED(BaseDir); }
+
+ void Cleanup() {}
+
+private:
+};
+
+HttpComputeService::Impl::Impl()
+{
+}
+
+HttpComputeService::Impl::~Impl()
+{
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+HttpComputeService::HttpComputeService(std::filesystem::path BaseDir) : m_Impl(std::make_unique<Impl>())
+{
+ using namespace std::literals;
+
+ m_Impl->Initialize(BaseDir);
+
+ m_Router.RegisterRoute(
+ "status",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+ Obj.BeginArray("modules");
+ Obj.EndArray();
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "stats",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+}
+
+HttpComputeService::~HttpComputeService()
+{
+}
+
+const char*
+HttpComputeService::BaseUri() const
+{
+ return "/compute/";
+}
+
+void
+HttpComputeService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ m_Router.HandleRequest(Request);
+}
+
+} // namespace zen
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/compute/computeservice.h b/src/zenserver/compute/computeservice.h
new file mode 100644
index 000000000..339200dd8
--- /dev/null
+++ b/src/zenserver/compute/computeservice.h
@@ -0,0 +1,36 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+#if ZEN_WITH_COMPUTE_SERVICES
+namespace zen {
+
+/** ZenServer Compute Service
+ *
+ * Manages a set of compute workers for use in UEFN content worker
+ *
+ */
+class HttpComputeService : public zen::HttpService
+{
+public:
+ HttpComputeService(std::filesystem::path BaseDir);
+ ~HttpComputeService();
+
+ HttpComputeService(const HttpComputeService&) = delete;
+ HttpComputeService& operator=(const HttpComputeService&) = delete;
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ HttpRequestRouter m_Router;
+
+ struct Impl;
+
+ std::unique_ptr<Impl> m_Impl;
+};
+
+} // namespace zen
+#endif // ZEN_WITH_COMPUTE_SERVICES