// Copyright Epic Games, Inc. All Rights Reserved. #include "zencompute/recordingreader.h" #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include # define ZEN_CONCRT_AVAILABLE 1 #else # define ZEN_CONCRT_AVAILABLE 0 #endif #if ZEN_WITH_COMPUTE_SERVICES namespace zen::compute { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// # if ZEN_PLATFORM_WINDOWS # define ZEN_BUILD_ACTION L"Build.action" # define ZEN_WORKER_UCB L"worker.ucb" # else # define ZEN_BUILD_ACTION "Build.action" # define ZEN_WORKER_UCB "worker.ucb" # endif ////////////////////////////////////////////////////////////////////////// struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor { virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t NativeModeOrAttributes, uint64_t NativeModificationTick) { ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick); if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0) { WorkDirs.push_back(Parent); } else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0) { WorkerDirs.push_back(Parent); } } virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes) { ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes); return true; } std::vector WorkerDirs; std::vector WorkDirs; }; ////////////////////////////////////////////////////////////////////////// void IterateOverArray(auto Array, auto Func, int TargetParallelism) { # if ZEN_CONCRT_AVAILABLE if (TargetParallelism > 1) { concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism); concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); }); return; } # else ZEN_UNUSED(TargetParallelism); # endif for (const auto& Item : Array) { Func(Item); } } ////////////////////////////////////////////////////////////////////////// RecordingReaderBase::~RecordingReaderBase() = default; ////////////////////////////////////////////////////////////////////////// RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath) { CidStoreConfiguration CidConfig; CidConfig.RootDirectory = m_RecordingLogDir / "cid"; CidConfig.HugeValueThreshold = 128 * 1024 * 1024; m_CidStore.Initialize(CidConfig); } RecordingReader::~RecordingReader() { m_CidStore.Flush(); } size_t RecordingReader::GetActionCount() const { return m_Actions.size(); } IoBuffer RecordingReader::FindChunkByCid(const IoHash& DecompressedId) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId)) { return Chunk; } ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId); return {}; } std::unordered_map RecordingReader::ReadWorkers() { std::unordered_map WorkerMap; { CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc"); CbObject Toc = TocFile.Object; m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead); ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); for (auto& It : Toc["toc"]) { CbArrayView Entry = It.AsArrayView(); CbFieldViewIterator Vit = Entry.CreateViewIterator(); const IoHash WorkerId = Vit++->AsHash(); const uint64_t Offset = Vit++->AsInt64(0); const uint64_t Size = Vit++->AsInt64(0); IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size); CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange); CbPackage& WorkerPkg = WorkerMap[WorkerId]; WorkerPkg.SetObject(WorkerDesc); WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid); if (AttachmentData) { IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); } }); } } // Scan actions as well (this should be called separately, ideally) ScanActions(); return WorkerMap; } void RecordingReader::ScanActions() { CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc"); CbObject Toc = TocFile.Object; m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead); ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); for (auto& It : Toc["toc"]) { CbArrayView ArrayEntry = It.AsArrayView(); CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator(); ActionEntry Entry; Entry.ActionId = Vit++->AsHash(); Entry.Offset = Vit++->AsInt64(0); Entry.Size = Vit++->AsInt64(0); m_Actions.push_back(Entry); } } void RecordingReader::IterateActions(std::function&& Callback, int TargetParallelism) { IterateOverArray( m_Actions, [&](const ActionEntry& Entry) { CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size)); Callback(ActionDesc, Entry.ActionId); }, TargetParallelism); } ////////////////////////////////////////////////////////////////////////// IoBuffer LocalResolver::FindChunkByCid(const IoHash& DecompressedId) { RwLock::SharedLockScope _(MapLock); if (auto It = Attachments.find(DecompressedId); It != Attachments.end()) { return It->second; } return {}; } void LocalResolver::Add(const IoHash& Cid, IoBuffer Data) { RwLock::ExclusiveLockScope _(MapLock); Data.SetContentType(ZenContentType::kCompressedBinary); Attachments[Cid] = Data; } /// UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath) { } UeRecordingReader::~UeRecordingReader() { } size_t UeRecordingReader::GetActionCount() const { return m_WorkDirs.size(); } IoBuffer UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId) { return m_LocalResolver.FindChunkByCid(DecompressedId); } std::unordered_map UeRecordingReader::ReadWorkers() { std::unordered_map WorkerMap; FileSystemTraversal Traversal; RecordingTreeVisitor Visitor; Traversal.TraverseFileSystem(m_RecordingDir, Visitor); m_WorkDirs = std::move(Visitor.WorkDirs); for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs) { CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb"); CbObject WorkerDesc = WorkerFile.Object; const IoHash& WorkerId = WorkerFile.Hash; CbPackage& WorkerPkg = WorkerMap[WorkerId]; WorkerPkg.SetObject(WorkerDesc); WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten(); IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); }); } return WorkerMap; } void UeRecordingReader::IterateActions(std::function&& Callback, int ParallelismTarget) { IterateOverArray( m_WorkDirs, [&](const std::filesystem::path& WorkDir) { CbPackage WorkPackage = ReadAction(WorkDir); CbObject ActionObject = WorkPackage.GetObject(); const IoHash& ActionId = WorkPackage.GetObjectHash(); Callback(ActionObject, ActionId); }, ParallelismTarget); } CbPackage UeRecordingReader::ReadAction(std::filesystem::path WorkDir) { CbPackage WorkPackage; std::filesystem::path WorkDescPath = WorkDir / "Build.action"; CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath); CbObject& ActionObject = ActionFile.Object; WorkPackage.SetObject(ActionObject); ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten(); m_LocalResolver.Add(AttachmentCid, AttachmentData); IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); ZEN_ASSERT(AttachmentCid == RawHash); WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash)); }); return WorkPackage; } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES