aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/recordingreader.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-02-18 11:28:03 +0100
committerGitHub Enterprise <[email protected]>2026-02-18 11:28:03 +0100
commit149a5c2faa8d59290b8b44717e504532e906aae2 (patch)
tree9c875f1fd89f65f939bf8f6ef67b506565be845c /src/zencompute/recordingreader.cpp
parentadd selective request logging support to http.sys (#762) (diff)
downloadzen-149a5c2faa8d59290b8b44717e504532e906aae2.tar.xz
zen-149a5c2faa8d59290b8b44717e504532e906aae2.zip
structured compute basics (#714)
this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config
Diffstat (limited to 'src/zencompute/recordingreader.cpp')
-rw-r--r--src/zencompute/recordingreader.cpp335
1 files changed, 335 insertions, 0 deletions
diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp
new file mode 100644
index 000000000..1c1a119cf
--- /dev/null
+++ b/src/zencompute/recordingreader.cpp
@@ -0,0 +1,335 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/recordingreader.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <ppl.h>
+# define ZEN_CONCRT_AVAILABLE 1
+#else
+# define ZEN_CONCRT_AVAILABLE 0
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+# if ZEN_PLATFORM_WINDOWS
+# define ZEN_BUILD_ACTION L"Build.action"
+# define ZEN_WORKER_UCB L"worker.ucb"
+# else
+# define ZEN_BUILD_ACTION "Build.action"
+# define ZEN_WORKER_UCB "worker.ucb"
+# endif
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor
+{
+ virtual void VisitFile(const std::filesystem::path& Parent,
+ const path_view& File,
+ uint64_t FileSize,
+ uint32_t NativeModeOrAttributes,
+ uint64_t NativeModificationTick)
+ {
+ ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick);
+
+ if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0)
+ {
+ WorkDirs.push_back(Parent);
+ }
+ else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0)
+ {
+ WorkerDirs.push_back(Parent);
+ }
+ }
+
+ virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes)
+ {
+ ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes);
+
+ return true;
+ }
+
+ std::vector<std::filesystem::path> WorkerDirs;
+ std::vector<std::filesystem::path> WorkDirs;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+IterateOverArray(auto Array, auto Func, int TargetParallelism)
+{
+# if ZEN_CONCRT_AVAILABLE
+ if (TargetParallelism > 1)
+ {
+ concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism);
+ concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); });
+
+ return;
+ }
+# else
+ ZEN_UNUSED(TargetParallelism);
+# endif
+
+ for (const auto& Item : Array)
+ {
+ Func(Item);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingReaderBase::~RecordingReaderBase() = default;
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath)
+{
+ CidStoreConfiguration CidConfig;
+ CidConfig.RootDirectory = m_RecordingLogDir / "cid";
+ CidConfig.HugeValueThreshold = 128 * 1024 * 1024;
+
+ m_CidStore.Initialize(CidConfig);
+}
+
+RecordingReader::~RecordingReader()
+{
+ m_CidStore.Flush();
+}
+
+size_t
+RecordingReader::GetActionCount() const
+{
+ return m_Actions.size();
+}
+
+IoBuffer
+RecordingReader::FindChunkByCid(const IoHash& DecompressedId)
+{
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId))
+ {
+ return Chunk;
+ }
+
+ ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId);
+
+ return {};
+}
+
+std::unordered_map<zen::IoHash, zen::CbPackage>
+RecordingReader::ReadWorkers()
+{
+ std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
+
+ {
+ CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc");
+ CbObject Toc = TocFile.Object;
+
+ m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead);
+
+ ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
+
+ for (auto& It : Toc["toc"])
+ {
+ CbArrayView Entry = It.AsArrayView();
+ CbFieldViewIterator Vit = Entry.CreateViewIterator();
+
+ const IoHash WorkerId = Vit++->AsHash();
+ const uint64_t Offset = Vit++->AsInt64(0);
+ const uint64_t Size = Vit++->AsInt64(0);
+
+ IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size);
+ CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange);
+ CbPackage& WorkerPkg = WorkerMap[WorkerId];
+ WorkerPkg.SetObject(WorkerDesc);
+
+ WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid);
+
+ if (AttachmentData)
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
+ }
+ });
+ }
+ }
+
+ // Scan actions as well (this should be called separately, ideally)
+
+ ScanActions();
+
+ return WorkerMap;
+}
+
+void
+RecordingReader::ScanActions()
+{
+ CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc");
+ CbObject Toc = TocFile.Object;
+
+ m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead);
+
+ ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
+
+ for (auto& It : Toc["toc"])
+ {
+ CbArrayView ArrayEntry = It.AsArrayView();
+ CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator();
+
+ ActionEntry Entry;
+ Entry.ActionId = Vit++->AsHash();
+ Entry.Offset = Vit++->AsInt64(0);
+ Entry.Size = Vit++->AsInt64(0);
+
+ m_Actions.push_back(Entry);
+ }
+}
+
+void
+RecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism)
+{
+ IterateOverArray(
+ m_Actions,
+ [&](const ActionEntry& Entry) {
+ CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size));
+
+ Callback(ActionDesc, Entry.ActionId);
+ },
+ TargetParallelism);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+IoBuffer
+LocalResolver::FindChunkByCid(const IoHash& DecompressedId)
+{
+ RwLock::SharedLockScope _(MapLock);
+ if (auto It = Attachments.find(DecompressedId); It != Attachments.end())
+ {
+ return It->second;
+ }
+
+ return {};
+}
+
+void
+LocalResolver::Add(const IoHash& Cid, IoBuffer Data)
+{
+ RwLock::ExclusiveLockScope _(MapLock);
+ Data.SetContentType(ZenContentType::kCompressedBinary);
+ Attachments[Cid] = Data;
+}
+
+///
+
+UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath)
+{
+}
+
+UeRecordingReader::~UeRecordingReader()
+{
+}
+
+size_t
+UeRecordingReader::GetActionCount() const
+{
+ return m_WorkDirs.size();
+}
+
+IoBuffer
+UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId)
+{
+ return m_LocalResolver.FindChunkByCid(DecompressedId);
+}
+
+std::unordered_map<zen::IoHash, zen::CbPackage>
+UeRecordingReader::ReadWorkers()
+{
+ std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
+
+ FileSystemTraversal Traversal;
+ RecordingTreeVisitor Visitor;
+ Traversal.TraverseFileSystem(m_RecordingDir, Visitor);
+
+ m_WorkDirs = std::move(Visitor.WorkDirs);
+
+ for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs)
+ {
+ CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb");
+ CbObject WorkerDesc = WorkerFile.Object;
+ const IoHash& WorkerId = WorkerFile.Hash;
+ CbPackage& WorkerPkg = WorkerMap[WorkerId];
+ WorkerPkg.SetObject(WorkerDesc);
+
+ WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten();
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
+ });
+ }
+
+ return WorkerMap;
+}
+
+void
+UeRecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int ParallelismTarget)
+{
+ IterateOverArray(
+ m_WorkDirs,
+ [&](const std::filesystem::path& WorkDir) {
+ CbPackage WorkPackage = ReadAction(WorkDir);
+ CbObject ActionObject = WorkPackage.GetObject();
+ const IoHash& ActionId = WorkPackage.GetObjectHash();
+
+ Callback(ActionObject, ActionId);
+ },
+ ParallelismTarget);
+}
+
+CbPackage
+UeRecordingReader::ReadAction(std::filesystem::path WorkDir)
+{
+ CbPackage WorkPackage;
+ std::filesystem::path WorkDescPath = WorkDir / "Build.action";
+ CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath);
+ CbObject& ActionObject = ActionFile.Object;
+
+ WorkPackage.SetObject(ActionObject);
+
+ ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten();
+
+ m_LocalResolver.Add(AttachmentCid, AttachmentData);
+
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ ZEN_ASSERT(AttachmentCid == RawHash);
+ WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash));
+ });
+
+ return WorkPackage;
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES