diff options
Diffstat (limited to 'src/zencompute/recordingreader.cpp')
| -rw-r--r-- | src/zencompute/recordingreader.cpp | 335 |
1 files changed, 0 insertions, 335 deletions
diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp deleted file mode 100644 index 1c1a119cf..000000000 --- a/src/zencompute/recordingreader.cpp +++ /dev/null @@ -1,335 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zencompute/recordingreader.h" - -#include <zencore/compactbinary.h> -#include <zencore/compactbinaryfile.h> -#include <zencore/compactbinaryvalue.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> - -#if ZEN_PLATFORM_WINDOWS -# include <ppl.h> -# define ZEN_CONCRT_AVAILABLE 1 -#else -# define ZEN_CONCRT_AVAILABLE 0 -#endif - -#if ZEN_WITH_COMPUTE_SERVICES - -namespace zen::compute { - -using namespace std::literals; - -////////////////////////////////////////////////////////////////////////// - -# if ZEN_PLATFORM_WINDOWS -# define ZEN_BUILD_ACTION L"Build.action" -# define ZEN_WORKER_UCB L"worker.ucb" -# else -# define ZEN_BUILD_ACTION "Build.action" -# define ZEN_WORKER_UCB "worker.ucb" -# endif - -////////////////////////////////////////////////////////////////////////// - -struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor -{ - virtual void VisitFile(const std::filesystem::path& Parent, - const path_view& File, - uint64_t FileSize, - uint32_t NativeModeOrAttributes, - uint64_t NativeModificationTick) - { - ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick); - - if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0) - { - WorkDirs.push_back(Parent); - } - else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0) - { - WorkerDirs.push_back(Parent); - } - } - - virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes) - { - ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes); - - return true; - } - - std::vector<std::filesystem::path> WorkerDirs; - std::vector<std::filesystem::path> WorkDirs; -}; - -////////////////////////////////////////////////////////////////////////// - -void -IterateOverArray(auto Array, auto Func, int TargetParallelism) -{ -# if ZEN_CONCRT_AVAILABLE - if (TargetParallelism > 1) - { - concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism); - concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); }); - - return; - } -# else - ZEN_UNUSED(TargetParallelism); -# endif - - for (const auto& Item : Array) - { - Func(Item); - } -} - -////////////////////////////////////////////////////////////////////////// - -RecordingReaderBase::~RecordingReaderBase() = default; - -////////////////////////////////////////////////////////////////////////// - -RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath) -{ - CidStoreConfiguration CidConfig; - CidConfig.RootDirectory = m_RecordingLogDir / "cid"; - CidConfig.HugeValueThreshold = 128 * 1024 * 1024; - - m_CidStore.Initialize(CidConfig); -} - -RecordingReader::~RecordingReader() -{ - m_CidStore.Flush(); -} - -size_t -RecordingReader::GetActionCount() const -{ - return m_Actions.size(); -} - -IoBuffer -RecordingReader::FindChunkByCid(const IoHash& DecompressedId) -{ - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId)) - { - return Chunk; - } - - ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId); - - return {}; -} - -std::unordered_map<zen::IoHash, zen::CbPackage> -RecordingReader::ReadWorkers() -{ - std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap; - - { - CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc"); - CbObject Toc = TocFile.Object; - - m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead); - - ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); - - for (auto& It : Toc["toc"]) - { - CbArrayView Entry = It.AsArrayView(); - CbFieldViewIterator Vit = Entry.CreateViewIterator(); - - const IoHash WorkerId = Vit++->AsHash(); - const uint64_t Offset = Vit++->AsInt64(0); - const uint64_t Size = Vit++->AsInt64(0); - - IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size); - CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange); - CbPackage& WorkerPkg = WorkerMap[WorkerId]; - WorkerPkg.SetObject(WorkerDesc); - - WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { - const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); - IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid); - - if (AttachmentData) - { - IoHash RawHash; - uint64_t RawSize = 0; - CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); - WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); - } - }); - } - } - - // Scan actions as well (this should be called separately, ideally) - - ScanActions(); - - return WorkerMap; -} - -void -RecordingReader::ScanActions() -{ - CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc"); - CbObject Toc = TocFile.Object; - - m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead); - - ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); - - for (auto& It : Toc["toc"]) - { - CbArrayView ArrayEntry = It.AsArrayView(); - CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator(); - - ActionEntry Entry; - Entry.ActionId = Vit++->AsHash(); - Entry.Offset = Vit++->AsInt64(0); - Entry.Size = Vit++->AsInt64(0); - - m_Actions.push_back(Entry); - } -} - -void -RecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) -{ - IterateOverArray( - m_Actions, - [&](const ActionEntry& Entry) { - CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size)); - - Callback(ActionDesc, Entry.ActionId); - }, - TargetParallelism); -} - -////////////////////////////////////////////////////////////////////////// - -IoBuffer -LocalResolver::FindChunkByCid(const IoHash& DecompressedId) -{ - RwLock::SharedLockScope _(MapLock); - if (auto It = Attachments.find(DecompressedId); It != Attachments.end()) - { - return It->second; - } - - return {}; -} - -void -LocalResolver::Add(const IoHash& Cid, IoBuffer Data) -{ - RwLock::ExclusiveLockScope _(MapLock); - Data.SetContentType(ZenContentType::kCompressedBinary); - Attachments[Cid] = Data; -} - -/// - -UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath) -{ -} - -UeRecordingReader::~UeRecordingReader() -{ -} - -size_t -UeRecordingReader::GetActionCount() const -{ - return m_WorkDirs.size(); -} - -IoBuffer -UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId) -{ - return m_LocalResolver.FindChunkByCid(DecompressedId); -} - -std::unordered_map<zen::IoHash, zen::CbPackage> -UeRecordingReader::ReadWorkers() -{ - std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap; - - FileSystemTraversal Traversal; - RecordingTreeVisitor Visitor; - Traversal.TraverseFileSystem(m_RecordingDir, Visitor); - - m_WorkDirs = std::move(Visitor.WorkDirs); - - for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs) - { - CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb"); - CbObject WorkerDesc = WorkerFile.Object; - const IoHash& WorkerId = WorkerFile.Hash; - CbPackage& WorkerPkg = WorkerMap[WorkerId]; - WorkerPkg.SetObject(WorkerDesc); - - WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { - const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); - IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten(); - IoHash RawHash; - uint64_t RawSize = 0; - CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); - WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); - }); - } - - return WorkerMap; -} - -void -UeRecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int ParallelismTarget) -{ - IterateOverArray( - m_WorkDirs, - [&](const std::filesystem::path& WorkDir) { - CbPackage WorkPackage = ReadAction(WorkDir); - CbObject ActionObject = WorkPackage.GetObject(); - const IoHash& ActionId = WorkPackage.GetObjectHash(); - - Callback(ActionObject, ActionId); - }, - ParallelismTarget); -} - -CbPackage -UeRecordingReader::ReadAction(std::filesystem::path WorkDir) -{ - CbPackage WorkPackage; - std::filesystem::path WorkDescPath = WorkDir / "Build.action"; - CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath); - CbObject& ActionObject = ActionFile.Object; - - WorkPackage.SetObject(ActionObject); - - ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { - const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); - IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten(); - - m_LocalResolver.Add(AttachmentCid, AttachmentData); - - IoHash RawHash; - uint64_t RawSize = 0; - CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); - ZEN_ASSERT(AttachmentCid == RawHash); - WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash)); - }); - - return WorkPackage; -} - -} // namespace zen::compute - -#endif // ZEN_WITH_COMPUTE_SERVICES |