aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/recordingreader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/recordingreader.cpp')
-rw-r--r--src/zencompute/recordingreader.cpp335
1 files changed, 0 insertions, 335 deletions
diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp
deleted file mode 100644
index 1c1a119cf..000000000
--- a/src/zencompute/recordingreader.cpp
+++ /dev/null
@@ -1,335 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "zencompute/recordingreader.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinaryfile.h>
-#include <zencore/compactbinaryvalue.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <ppl.h>
-# define ZEN_CONCRT_AVAILABLE 1
-#else
-# define ZEN_CONCRT_AVAILABLE 0
-#endif
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-namespace zen::compute {
-
-using namespace std::literals;
-
-//////////////////////////////////////////////////////////////////////////
-
-# if ZEN_PLATFORM_WINDOWS
-# define ZEN_BUILD_ACTION L"Build.action"
-# define ZEN_WORKER_UCB L"worker.ucb"
-# else
-# define ZEN_BUILD_ACTION "Build.action"
-# define ZEN_WORKER_UCB "worker.ucb"
-# endif
-
-//////////////////////////////////////////////////////////////////////////
-
-struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor
-{
- virtual void VisitFile(const std::filesystem::path& Parent,
- const path_view& File,
- uint64_t FileSize,
- uint32_t NativeModeOrAttributes,
- uint64_t NativeModificationTick)
- {
- ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick);
-
- if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0)
- {
- WorkDirs.push_back(Parent);
- }
- else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0)
- {
- WorkerDirs.push_back(Parent);
- }
- }
-
- virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes)
- {
- ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes);
-
- return true;
- }
-
- std::vector<std::filesystem::path> WorkerDirs;
- std::vector<std::filesystem::path> WorkDirs;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-void
-IterateOverArray(auto Array, auto Func, int TargetParallelism)
-{
-# if ZEN_CONCRT_AVAILABLE
- if (TargetParallelism > 1)
- {
- concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism);
- concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); });
-
- return;
- }
-# else
- ZEN_UNUSED(TargetParallelism);
-# endif
-
- for (const auto& Item : Array)
- {
- Func(Item);
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-RecordingReaderBase::~RecordingReaderBase() = default;
-
-//////////////////////////////////////////////////////////////////////////
-
-RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath)
-{
- CidStoreConfiguration CidConfig;
- CidConfig.RootDirectory = m_RecordingLogDir / "cid";
- CidConfig.HugeValueThreshold = 128 * 1024 * 1024;
-
- m_CidStore.Initialize(CidConfig);
-}
-
-RecordingReader::~RecordingReader()
-{
- m_CidStore.Flush();
-}
-
-size_t
-RecordingReader::GetActionCount() const
-{
- return m_Actions.size();
-}
-
-IoBuffer
-RecordingReader::FindChunkByCid(const IoHash& DecompressedId)
-{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId))
- {
- return Chunk;
- }
-
- ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId);
-
- return {};
-}
-
-std::unordered_map<zen::IoHash, zen::CbPackage>
-RecordingReader::ReadWorkers()
-{
- std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
-
- {
- CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc");
- CbObject Toc = TocFile.Object;
-
- m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead);
-
- ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
-
- for (auto& It : Toc["toc"])
- {
- CbArrayView Entry = It.AsArrayView();
- CbFieldViewIterator Vit = Entry.CreateViewIterator();
-
- const IoHash WorkerId = Vit++->AsHash();
- const uint64_t Offset = Vit++->AsInt64(0);
- const uint64_t Size = Vit++->AsInt64(0);
-
- IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size);
- CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange);
- CbPackage& WorkerPkg = WorkerMap[WorkerId];
- WorkerPkg.SetObject(WorkerDesc);
-
- WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
- const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
- IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid);
-
- if (AttachmentData)
- {
- IoHash RawHash;
- uint64_t RawSize = 0;
- CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
- WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
- }
- });
- }
- }
-
- // Scan actions as well (this should be called separately, ideally)
-
- ScanActions();
-
- return WorkerMap;
-}
-
-void
-RecordingReader::ScanActions()
-{
- CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc");
- CbObject Toc = TocFile.Object;
-
- m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead);
-
- ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
-
- for (auto& It : Toc["toc"])
- {
- CbArrayView ArrayEntry = It.AsArrayView();
- CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator();
-
- ActionEntry Entry;
- Entry.ActionId = Vit++->AsHash();
- Entry.Offset = Vit++->AsInt64(0);
- Entry.Size = Vit++->AsInt64(0);
-
- m_Actions.push_back(Entry);
- }
-}
-
-void
-RecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism)
-{
- IterateOverArray(
- m_Actions,
- [&](const ActionEntry& Entry) {
- CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size));
-
- Callback(ActionDesc, Entry.ActionId);
- },
- TargetParallelism);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-IoBuffer
-LocalResolver::FindChunkByCid(const IoHash& DecompressedId)
-{
- RwLock::SharedLockScope _(MapLock);
- if (auto It = Attachments.find(DecompressedId); It != Attachments.end())
- {
- return It->second;
- }
-
- return {};
-}
-
-void
-LocalResolver::Add(const IoHash& Cid, IoBuffer Data)
-{
- RwLock::ExclusiveLockScope _(MapLock);
- Data.SetContentType(ZenContentType::kCompressedBinary);
- Attachments[Cid] = Data;
-}
-
-///
-
-UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath)
-{
-}
-
-UeRecordingReader::~UeRecordingReader()
-{
-}
-
-size_t
-UeRecordingReader::GetActionCount() const
-{
- return m_WorkDirs.size();
-}
-
-IoBuffer
-UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId)
-{
- return m_LocalResolver.FindChunkByCid(DecompressedId);
-}
-
-std::unordered_map<zen::IoHash, zen::CbPackage>
-UeRecordingReader::ReadWorkers()
-{
- std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
-
- FileSystemTraversal Traversal;
- RecordingTreeVisitor Visitor;
- Traversal.TraverseFileSystem(m_RecordingDir, Visitor);
-
- m_WorkDirs = std::move(Visitor.WorkDirs);
-
- for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs)
- {
- CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb");
- CbObject WorkerDesc = WorkerFile.Object;
- const IoHash& WorkerId = WorkerFile.Hash;
- CbPackage& WorkerPkg = WorkerMap[WorkerId];
- WorkerPkg.SetObject(WorkerDesc);
-
- WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
- const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
- IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten();
- IoHash RawHash;
- uint64_t RawSize = 0;
- CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
- WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
- });
- }
-
- return WorkerMap;
-}
-
-void
-UeRecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int ParallelismTarget)
-{
- IterateOverArray(
- m_WorkDirs,
- [&](const std::filesystem::path& WorkDir) {
- CbPackage WorkPackage = ReadAction(WorkDir);
- CbObject ActionObject = WorkPackage.GetObject();
- const IoHash& ActionId = WorkPackage.GetObjectHash();
-
- Callback(ActionObject, ActionId);
- },
- ParallelismTarget);
-}
-
-CbPackage
-UeRecordingReader::ReadAction(std::filesystem::path WorkDir)
-{
- CbPackage WorkPackage;
- std::filesystem::path WorkDescPath = WorkDir / "Build.action";
- CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath);
- CbObject& ActionObject = ActionFile.Object;
-
- WorkPackage.SetObject(ActionObject);
-
- ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
- const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
- IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten();
-
- m_LocalResolver.Add(AttachmentCid, AttachmentData);
-
- IoHash RawHash;
- uint64_t RawSize = 0;
- CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
- ZEN_ASSERT(AttachmentCid == RawHash);
- WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash));
- });
-
- return WorkPackage;
-}
-
-} // namespace zen::compute
-
-#endif // ZEN_WITH_COMPUTE_SERVICES