diff options
| author | Stefan Boberg <[email protected]> | 2026-02-18 11:28:03 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-02-18 11:28:03 +0100 |
| commit | 149a5c2faa8d59290b8b44717e504532e906aae2 (patch) | |
| tree | 9c875f1fd89f65f939bf8f6ef67b506565be845c /src/zenserver/compute/computeserver.cpp | |
| parent | add selective request logging support to http.sys (#762) (diff) | |
| download | zen-149a5c2faa8d59290b8b44717e504532e906aae2.tar.xz zen-149a5c2faa8d59290b8b44717e504532e906aae2.zip | |
structured compute basics (#714)
this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver
this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes
to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system
all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config
Diffstat (limited to 'src/zenserver/compute/computeserver.cpp')
| -rw-r--r-- | src/zenserver/compute/computeserver.cpp | 330 |
1 files changed, 330 insertions, 0 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp new file mode 100644 index 000000000..173f56386 --- /dev/null +++ b/src/zenserver/compute/computeserver.cpp @@ -0,0 +1,330 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "computeserver.h" +#include <zencompute/httpfunctionservice.h> +#include "computeservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/fmtutils.h> +# include <zencore/memory/llm.h> +# include <zencore/memory/memorytrace.h> +# include <zencore/memory/tagtrace.h> +# include <zencore/scopeguard.h> +# include <zencore/sentryintegration.h> +# include <zencore/system.h> +# include <zencore/windows.h> +# include <zenhttp/httpapiservice.h> +# include <zenstore/cidstore.h> +# include <zenutil/service.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +void +ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) +{ + Options.add_option("compute", + "", + "upstream-notification-endpoint", + "Endpoint URL for upstream notifications", + cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), + ""); + + Options.add_option("compute", + "", + "instance-id", + "Instance ID for use in notifications", + cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), + ""); +} + +void +ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) +{ + ZEN_UNUSED(LuaOptions); +} + +void +ZenComputeServerConfigurator::ValidateOptions() +{ +} + +/////////////////////////////////////////////////////////////////////////// + +ZenComputeServer::ZenComputeServer() +{ +} + +ZenComputeServer::~ZenComputeServer() +{ + Cleanup(); +} + +int +ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenComputeServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode"); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + // This is a workaround to make sure we can have automated tests. Without + // this the ranges for different child zen hub processes could overlap with + // the main test range. + ZenServerEnvironment::SetBaseChildId(1000); + + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; + + InitializeState(ServerConfig); + InitializeServices(ServerConfig); + RegisterServices(ServerConfig); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenComputeServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenStorageServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + if (m_Http) + { + m_Http->Close(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +void +ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); +} + +void +ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_INFO("initializing storage"); + + CidStoreConfiguration Config; + Config.RootDirectory = m_DataRoot / "cas"; + + m_CidStore = std::make_unique<CidStore>(m_GcManager); + m_CidStore->Initialize(Config); + + ZEN_INFO("instantiating API service"); + m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); + + ZEN_INFO("instantiating compute service"); + m_ComputeService = std::make_unique<HttpComputeService>(ServerConfig.DataDir / "compute"); + + // Ref<zen::compute::FunctionRunner> Runner; + // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner"); + + // TODO: (re)implement default configuration here + + ZEN_INFO("instantiating function service"); + m_FunctionService = + std::make_unique<zen::compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions"); +} + +void +ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); + + if (m_ComputeService) + { + m_Http->RegisterService(*m_ComputeService); + } + + if (m_ApiService) + { + m_Http->RegisterService(*m_ApiService); + } + + if (m_FunctionService) + { + m_Http->RegisterService(*m_FunctionService); + } +} + +void +ZenComputeServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + // clang-format off + ZEN_INFO( R"(__________ _________ __ )" "\n" + R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n" + R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n" + R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n" + R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n" + R"( \/ \/ \/ \/ \/|__| \/ )"); + // clang-format on + + ExtendableStringBuilder<256> BuildOptions; + GetBuildOptions(BuildOptions, '\n'); + ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); + } + + ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId()); + +# if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +# endif + +# if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +# endif + + if (m_DebugOptionForcedCrash) + { + ZEN_DEBUG_BREAK(); + } + + const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; + + SetNewState(kRunning); + + OnReady(); + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +////////////////////////////////////////////////////////////////////////////////// + +ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions) +: ZenServerMain(ServerOptions) +, m_ServerOptions(ServerOptions) +{ +} + +void +ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenComputeServer Server; + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + // Server.Initialize has already logged what the issue is - just exit with failure code here. + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<NamedEvent> ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + // Monitor shutdown signals + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_mon"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + // If we have a parent process, establish the mechanisms we need + // to be able to communicate readiness with the parent + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES |