aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/include
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-02-18 11:28:03 +0100
committerGitHub Enterprise <[email protected]>2026-02-18 11:28:03 +0100
commit149a5c2faa8d59290b8b44717e504532e906aae2 (patch)
tree9c875f1fd89f65f939bf8f6ef67b506565be845c /src/zencompute/include
parentadd selective request logging support to http.sys (#762) (diff)
downloadzen-149a5c2faa8d59290b8b44717e504532e906aae2.tar.xz
zen-149a5c2faa8d59290b8b44717e504532e906aae2.zip
structured compute basics (#714)
this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config
Diffstat (limited to 'src/zencompute/include')
-rw-r--r--src/zencompute/include/zencompute/functionservice.h132
-rw-r--r--src/zencompute/include/zencompute/httpfunctionservice.h73
-rw-r--r--src/zencompute/include/zencompute/httporchestrator.h44
-rw-r--r--src/zencompute/include/zencompute/recordingreader.h127
-rw-r--r--src/zencompute/include/zencompute/zencompute.h11
5 files changed, 387 insertions, 0 deletions
diff --git a/src/zencompute/include/zencompute/functionservice.h b/src/zencompute/include/zencompute/functionservice.h
new file mode 100644
index 000000000..1deb99fd5
--- /dev/null
+++ b/src/zencompute/include/zencompute/functionservice.h
@@ -0,0 +1,132 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_COMPUTE_SERVICES)
+# define ZEN_WITH_COMPUTE_SERVICES 1
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/iohash.h>
+# include <zenstore/zenstore.h>
+# include <zenhttp/httpcommon.h>
+
+# include <filesystem>
+
+namespace zen {
+class ChunkResolver;
+class CbObjectWriter;
+} // namespace zen
+
+namespace zen::compute {
+
+class ActionRecorder;
+class FunctionServiceSession;
+class IActionResultHandler;
+class LocalProcessRunner;
+class RemoteHttpRunner;
+struct RunnerAction;
+struct SubmitResult;
+
+struct WorkerDesc
+{
+ CbPackage Descriptor;
+ IoHash WorkerId{IoHash::Zero};
+
+ inline operator bool() const { return WorkerId != IoHash::Zero; }
+};
+
+/**
+ * Lambda style compute function service
+ *
+ * The responsibility of this class is to accept function execution requests, and
+ * schedule them using one or more FunctionRunner instances. It will basically always
+ * accept requests, queueing them if necessary, and then hand them off to runners
+ * as they become available.
+ *
+ * This is typically fronted by an API service that handles communication with clients.
+ */
+class FunctionServiceSession final
+{
+public:
+ FunctionServiceSession(ChunkResolver& InChunkResolver);
+ ~FunctionServiceSession();
+
+ void Shutdown();
+ bool IsHealthy();
+
+ // Worker registration and discovery
+
+ void RegisterWorker(CbPackage Worker);
+ [[nodiscard]] WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
+ [[nodiscard]] std::vector<IoHash> GetKnownWorkerIds();
+
+ // Action runners
+
+ void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath);
+ void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName);
+
+ // Action submission
+
+ struct EnqueueResult
+ {
+ int Lsn;
+ CbObject ResponseMessage;
+
+ inline operator bool() const { return Lsn != 0; }
+ };
+
+ [[nodiscard]] EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int Priority);
+ [[nodiscard]] EnqueueResult EnqueueAction(CbObject ActionObject, int Priority);
+
+ // Completed action tracking
+
+ [[nodiscard]] HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
+ [[nodiscard]] HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
+
+ void GetCompleted(CbWriter&);
+
+ // Action history tracking (note that this is separate from completed action tracking, and
+ // will include actions which have been retired and no longer have their results available)
+
+ struct ActionHistoryEntry
+ {
+ int Lsn;
+ IoHash ActionId;
+ IoHash WorkerId;
+ CbObject ActionDescriptor;
+ bool Succeeded;
+ uint64_t Timestamps[5] = {};
+ };
+
+ [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100);
+
+ // Stats reporting
+
+ void EmitStats(CbObjectWriter& Cbo);
+
+ // Recording
+
+ void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
+ void StopRecording();
+
+private:
+ void PostUpdate(RunnerAction* Action);
+
+ friend class FunctionRunner;
+ friend struct RunnerAction;
+
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+void function_forcelink();
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/httpfunctionservice.h b/src/zencompute/include/zencompute/httpfunctionservice.h
new file mode 100644
index 000000000..6e2344ae6
--- /dev/null
+++ b/src/zencompute/include/zencompute/httpfunctionservice.h
@@ -0,0 +1,73 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_COMPUTE_SERVICES)
+# define ZEN_WITH_COMPUTE_SERVICES 1
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "zencompute/functionservice.h"
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zentelemetry/stats.h>
+# include <zenhttp/httpserver.h>
+
+# include <deque>
+# include <filesystem>
+# include <unordered_map>
+
+namespace zen {
+class CidStore;
+}
+
+namespace zen::compute {
+
+class HttpFunctionService;
+class FunctionService;
+
+/**
+ * HTTP interface for compute function service
+ */
+class HttpFunctionService : public HttpService, public IHttpStatsProvider
+{
+public:
+ HttpFunctionService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir);
+ ~HttpFunctionService();
+
+ void Shutdown();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+ // IHttpStatsProvider
+
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+
+protected:
+ CidStore& m_CidStore;
+ IHttpStatsService& m_StatsService;
+ LoggerRef Log() { return m_Log; }
+
+private:
+ LoggerRef m_Log;
+ std::filesystem ::path m_BaseDir;
+ HttpRequestRouter m_Router;
+ FunctionServiceSession m_FunctionService;
+
+ // Metrics
+
+ metrics::OperationTiming m_HttpRequests;
+};
+
+void httpfunction_forcelink();
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h
new file mode 100644
index 000000000..168c6d7fe
--- /dev/null
+++ b/src/zencompute/include/zencompute/httporchestrator.h
@@ -0,0 +1,44 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zenhttp/httpserver.h>
+
+#include <unordered_map>
+
+namespace zen::compute {
+
+/**
+ * Mock orchestrator service, for testing dynamic provisioning
+ */
+
+class HttpOrchestratorService : public HttpService
+{
+public:
+ HttpOrchestratorService();
+ ~HttpOrchestratorService();
+
+ HttpOrchestratorService(const HttpOrchestratorService&) = delete;
+ HttpOrchestratorService& operator=(const HttpOrchestratorService&) = delete;
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+private:
+ HttpRequestRouter m_Router;
+ LoggerRef m_Log;
+
+ struct KnownWorker
+ {
+ std::string_view BaseUri;
+ Stopwatch LastSeen;
+ };
+
+ RwLock m_KnownWorkersLock;
+ std::unordered_map<std::string, KnownWorker> m_KnownWorkers;
+};
+
+} // namespace zen::compute
diff --git a/src/zencompute/include/zencompute/recordingreader.h b/src/zencompute/include/zencompute/recordingreader.h
new file mode 100644
index 000000000..bf1aff125
--- /dev/null
+++ b/src/zencompute/include/zencompute/recordingreader.h
@@ -0,0 +1,127 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/functionservice.h>
+#include <zencompute/zencompute.h>
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
+#include <zenstore/zenstore.h>
+
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+namespace zen {
+class CbObject;
+class CbPackage;
+struct IoHash;
+} // namespace zen
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+class RecordingReaderBase
+{
+ RecordingReaderBase(const RecordingReaderBase&) = delete;
+ RecordingReaderBase& operator=(const RecordingReaderBase&) = delete;
+
+public:
+ RecordingReaderBase() = default;
+ virtual ~RecordingReaderBase() = 0;
+ virtual std::unordered_map<IoHash, CbPackage> ReadWorkers() = 0;
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) = 0;
+ virtual size_t GetActionCount() const = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+/**
+ * Reader for recordings done via the zencompute recording system, which
+ * have a shared chunk store and a log of actions with pointers into the
+ * chunk store for their data.
+ */
+class RecordingReader : public RecordingReaderBase, public ChunkResolver
+{
+public:
+ explicit RecordingReader(const std::filesystem::path& RecordingPath);
+ ~RecordingReader();
+
+ virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override;
+
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback,
+ int TargetParallelism) override;
+ virtual size_t GetActionCount() const override;
+
+private:
+ std::filesystem::path m_RecordingLogDir;
+ BasicFile m_WorkerDataFile;
+ BasicFile m_ActionDataFile;
+ GcManager m_Gc;
+ CidStore m_CidStore{m_Gc};
+
+ // ChunkResolver interface
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+ struct ActionEntry
+ {
+ IoHash ActionId;
+ uint64_t Offset;
+ uint64_t Size;
+ };
+
+ std::vector<ActionEntry> m_Actions;
+
+ void ScanActions();
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct LocalResolver : public ChunkResolver
+{
+ LocalResolver(const LocalResolver&) = delete;
+ LocalResolver& operator=(const LocalResolver&) = delete;
+
+ LocalResolver() = default;
+ ~LocalResolver() = default;
+
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ void Add(const IoHash& Cid, IoBuffer Data);
+
+private:
+ RwLock MapLock;
+ std::unordered_map<IoHash, IoBuffer> Attachments;
+};
+
+/**
+ * This is a reader for UE/DDB recordings, which have a different layout on
+ * disk (no shared chunk store)
+ */
+class UeRecordingReader : public RecordingReaderBase, public ChunkResolver
+{
+public:
+ explicit UeRecordingReader(const std::filesystem::path& RecordingPath);
+ ~UeRecordingReader();
+
+ virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override;
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback,
+ int TargetParallelism) override;
+ virtual size_t GetActionCount() const override;
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+private:
+ std::filesystem::path m_RecordingDir;
+ LocalResolver m_LocalResolver;
+ std::vector<std::filesystem::path> m_WorkDirs;
+
+ CbPackage ReadAction(std::filesystem::path WorkDir);
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/zencompute.h b/src/zencompute/include/zencompute/zencompute.h
new file mode 100644
index 000000000..6dc32eeea
--- /dev/null
+++ b/src/zencompute/include/zencompute/zencompute.h
@@ -0,0 +1,11 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+namespace zen {
+
+void zencompute_forcelinktests();
+
+}