aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute/computeserver.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-02-18 11:28:03 +0100
committerGitHub Enterprise <[email protected]>2026-02-18 11:28:03 +0100
commit149a5c2faa8d59290b8b44717e504532e906aae2 (patch)
tree9c875f1fd89f65f939bf8f6ef67b506565be845c /src/zenserver/compute/computeserver.cpp
parentadd selective request logging support to http.sys (#762) (diff)
downloadzen-149a5c2faa8d59290b8b44717e504532e906aae2.tar.xz
zen-149a5c2faa8d59290b8b44717e504532e906aae2.zip
structured compute basics (#714)
this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config
Diffstat (limited to 'src/zenserver/compute/computeserver.cpp')
-rw-r--r--src/zenserver/compute/computeserver.cpp330
1 files changed, 330 insertions, 0 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
new file mode 100644
index 000000000..173f56386
--- /dev/null
+++ b/src/zenserver/compute/computeserver.cpp
@@ -0,0 +1,330 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "computeserver.h"
+#include <zencompute/httpfunctionservice.h>
+#include "computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/fmtutils.h>
+# include <zencore/memory/llm.h>
+# include <zencore/memory/memorytrace.h>
+# include <zencore/memory/tagtrace.h>
+# include <zencore/scopeguard.h>
+# include <zencore/sentryintegration.h>
+# include <zencore/system.h>
+# include <zencore/windows.h>
+# include <zenhttp/httpapiservice.h>
+# include <zenstore/cidstore.h>
+# include <zenutil/service.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <cxxopts.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+void
+ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
+{
+ Options.add_option("compute",
+ "",
+ "upstream-notification-endpoint",
+ "Endpoint URL for upstream notifications",
+ cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "instance-id",
+ "Instance ID for use in notifications",
+ cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
+ "");
+}
+
+void
+ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
+{
+ ZEN_UNUSED(LuaOptions);
+}
+
+void
+ZenComputeServerConfigurator::ValidateOptions()
+{
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+ZenComputeServer::ZenComputeServer()
+{
+}
+
+ZenComputeServer::~ZenComputeServer()
+{
+ Cleanup();
+}
+
+int
+ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry)
+{
+ ZEN_TRACE_CPU("ZenComputeServer::Initialize");
+ ZEN_MEMSCOPE(GetZenserverTag());
+
+ ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode");
+
+ const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry);
+ if (EffectiveBasePort < 0)
+ {
+ return EffectiveBasePort;
+ }
+
+ // This is a workaround to make sure we can have automated tests. Without
+ // this the ranges for different child zen hub processes could overlap with
+ // the main test range.
+ ZenServerEnvironment::SetBaseChildId(1000);
+
+ m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
+
+ InitializeState(ServerConfig);
+ InitializeServices(ServerConfig);
+ RegisterServices(ServerConfig);
+
+ ZenServerBase::Finalize();
+
+ return EffectiveBasePort;
+}
+
+void
+ZenComputeServer::Cleanup()
+{
+ ZEN_TRACE_CPU("ZenStorageServer::Cleanup");
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ try
+ {
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
+ {
+ m_IoRunner.join();
+ }
+
+ if (m_Http)
+ {
+ m_Http->Close();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
+ }
+}
+
+void
+ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+}
+
+void
+ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_INFO("initializing storage");
+
+ CidStoreConfiguration Config;
+ Config.RootDirectory = m_DataRoot / "cas";
+
+ m_CidStore = std::make_unique<CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+
+ ZEN_INFO("instantiating API service");
+ m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
+
+ ZEN_INFO("instantiating compute service");
+ m_ComputeService = std::make_unique<HttpComputeService>(ServerConfig.DataDir / "compute");
+
+ // Ref<zen::compute::FunctionRunner> Runner;
+ // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner");
+
+ // TODO: (re)implement default configuration here
+
+ ZEN_INFO("instantiating function service");
+ m_FunctionService =
+ std::make_unique<zen::compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions");
+}
+
+void
+ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+
+ if (m_ComputeService)
+ {
+ m_Http->RegisterService(*m_ComputeService);
+ }
+
+ if (m_ApiService)
+ {
+ m_Http->RegisterService(*m_ApiService);
+ }
+
+ if (m_FunctionService)
+ {
+ m_Http->RegisterService(*m_FunctionService);
+ }
+}
+
+void
+ZenComputeServer::Run()
+{
+ if (m_ProcessMonitor.IsActive())
+ {
+ CheckOwnerPid();
+ }
+
+ if (!m_TestMode)
+ {
+ // clang-format off
+ ZEN_INFO( R"(__________ _________ __ )" "\n"
+ R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n"
+ R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n"
+ R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n"
+ R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n"
+ R"( \/ \/ \/ \/ \/|__| \/ )");
+ // clang-format on
+
+ ExtendableStringBuilder<256> BuildOptions;
+ GetBuildOptions(BuildOptions, '\n');
+ ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions);
+ }
+
+ ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId());
+
+# if ZEN_PLATFORM_WINDOWS
+ if (zen::windows::IsRunningOnWine())
+ {
+ ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well");
+ }
+# endif
+
+# if ZEN_USE_SENTRY
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
+ {
+ SentryIntegration::ClearCaches();
+ }
+# endif
+
+ if (m_DebugOptionForcedCrash)
+ {
+ ZEN_DEBUG_BREAK();
+ }
+
+ const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode;
+
+ SetNewState(kRunning);
+
+ OnReady();
+
+ m_Http->Run(IsInteractiveMode);
+
+ SetNewState(kShuttingDown);
+
+ ZEN_INFO(ZEN_APP_NAME " exiting");
+}
+
+//////////////////////////////////////////////////////////////////////////////////
+
+ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions)
+: ZenServerMain(ServerOptions)
+, m_ServerOptions(ServerOptions)
+{
+}
+
+void
+ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry)
+{
+ ZenComputeServer Server;
+ Server.SetDataRoot(m_ServerOptions.DataDir);
+ Server.SetContentRoot(m_ServerOptions.ContentDir);
+ Server.SetTestMode(m_ServerOptions.IsTest);
+ Server.SetDedicatedMode(m_ServerOptions.IsDedicated);
+
+ const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry);
+ if (EffectiveBasePort == -1)
+ {
+ // Server.Initialize has already logged what the issue is - just exit with failure code here.
+ std::exit(1);
+ }
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != m_ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ m_ServerOptions.BasePort = EffectiveBasePort;
+ }
+
+ std::unique_ptr<std::thread> ShutdownThread;
+ std::unique_ptr<NamedEvent> ShutdownEvent;
+
+ ExtendableStringBuilder<64> ShutdownEventName;
+ ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown";
+ ShutdownEvent.reset(new NamedEvent{ShutdownEventName});
+
+ // Monitor shutdown signals
+
+ ShutdownThread.reset(new std::thread{[&] {
+ SetCurrentThreadName("shutdown_mon");
+
+ ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId());
+
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId());
+ Server.RequestExit(0);
+ }
+ else
+ {
+ ZEN_INFO("shutdown signal wait() failed");
+ }
+ }});
+
+ auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] {
+ ReportServiceStatus(ServiceStatus::Stopping);
+
+ if (ShutdownEvent)
+ {
+ ShutdownEvent->Set();
+ }
+ if (ShutdownThread && ShutdownThread->joinable())
+ {
+ ShutdownThread->join();
+ }
+ });
+
+ // If we have a parent process, establish the mechanisms we need
+ // to be able to communicate readiness with the parent
+
+ Server.SetIsReadyFunc([&] {
+ std::error_code Ec;
+ m_LockFile.Update(MakeLockData(true), Ec);
+ ReportServiceStatus(ServiceStatus::Running);
+ NotifyReady();
+ });
+
+ Server.Run();
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES