// Copyright Epic Games, Inc. All Rights Reserved. #include "actionrecorder.h" #include "functionrunner.h" #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include # define ZEN_CONCRT_AVAILABLE 1 #else # define ZEN_CONCRT_AVAILABLE 0 #endif #if ZEN_WITH_COMPUTE_SERVICES namespace zen::compute { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// RecordingFileWriter::RecordingFileWriter() { } RecordingFileWriter::~RecordingFileWriter() { Close(); } void RecordingFileWriter::Open(std::filesystem::path FilePath) { using namespace std::literals; m_File.Open(FilePath, BasicFile::Mode::kTruncate); m_File.Write("----DDC2----DATA", 16, 0); m_FileOffset = 16; std::filesystem::path TocPath = FilePath.replace_extension(".ztoc"); m_TocFile.Open(TocPath, BasicFile::Mode::kTruncate); m_TocWriter << "version"sv << 1; m_TocWriter.BeginArray("toc"sv); } void RecordingFileWriter::Close() { m_TocWriter.EndArray(); CbObject Toc = m_TocWriter.Save(); std::error_code Ec; m_TocFile.WriteAll(Toc.GetBuffer().AsIoBuffer(), Ec); } void RecordingFileWriter::AppendObject(const CbObject& Object, const IoHash& ObjectHash) { RwLock::ExclusiveLockScope _(m_FileLock); MemoryView ObjectView = Object.GetBuffer().GetView(); std::error_code Ec; m_File.Write(ObjectView, m_FileOffset, Ec); if (Ec) { throw std::system_error(Ec, "failed writing to archive"); } m_TocWriter.BeginArray(); m_TocWriter.AddHash(ObjectHash); m_TocWriter.AddInteger(m_FileOffset); m_TocWriter.AddInteger(gsl::narrow(ObjectView.GetSize())); m_TocWriter.EndArray(); m_FileOffset += ObjectView.GetSize(); } ////////////////////////////////////////////////////////////////////////// ActionRecorder::ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath) : m_ChunkResolver(InChunkResolver) , m_RecordingLogDir(RecordingLogPath) { std::error_code Ec; CreateDirectories(m_RecordingLogDir, Ec); if (Ec) { ZEN_WARN("Could not create directory '{}': {}", m_RecordingLogDir, Ec.message()); } CleanDirectory(m_RecordingLogDir, /* ForceRemoveReadOnlyFiles */ true, Ec); if (Ec) { ZEN_WARN("Could not clean directory '{}': {}", m_RecordingLogDir, Ec.message()); } m_WorkersFile.Open(m_RecordingLogDir / "workers.zdat"); m_ActionsFile.Open(m_RecordingLogDir / "actions.zdat"); CidStoreConfiguration CidConfig; CidConfig.RootDirectory = m_RecordingLogDir / "cid"; CidConfig.HugeValueThreshold = 128 * 1024 * 1024; m_CidStore.Initialize(CidConfig); } ActionRecorder::~ActionRecorder() { Shutdown(); } void ActionRecorder::Shutdown() { m_CidStore.Flush(); } void ActionRecorder::RegisterWorker(const CbPackage& WorkerPackage) { const IoHash WorkerId = WorkerPackage.GetObjectHash(); m_WorkersFile.AppendObject(WorkerPackage.GetObject(), WorkerId); std::unordered_set AddedChunks; uint64_t AddedBytes = 0; // First add all attachments from the worker package itself for (const CbAttachment& Attachment : WorkerPackage.GetAttachments()) { CompressedBuffer Buffer = Attachment.AsCompressedBinary(); IoBuffer Data = Buffer.GetCompressed().Flatten().AsIoBuffer(); const IoHash ChunkHash = Buffer.DecodeRawHash(); CidStore::InsertResult Result = m_CidStore.AddChunk(Data, ChunkHash, CidStore::InsertMode::kCopyOnly); AddedChunks.insert(ChunkHash); if (Result.New) { AddedBytes += Data.GetSize(); } } // Not all attachments will be present in the worker package, so we need to add // all referenced chunks to ensure that the recording is self-contained and not // referencing data in the main CID store CbObject WorkerDescriptor = WorkerPackage.GetObject(); WorkerDescriptor.IterateAttachments([&](const CbFieldView AttachmentField) { const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); if (!AddedChunks.contains(AttachmentCid)) { IoBuffer AttachmentData = m_ChunkResolver.FindChunkByCid(AttachmentCid); if (AttachmentData) { CidStore::InsertResult Result = m_CidStore.AddChunk(AttachmentData, AttachmentCid, CidStore::InsertMode::kCopyOnly); if (Result.New) { AddedBytes += AttachmentData.GetSize(); } } else { ZEN_WARN("RegisterWorker: could not resolve attachment chunk {} for worker {}", AttachmentCid, WorkerId); } AddedChunks.insert(AttachmentCid); } }); ZEN_INFO("recorded worker {} with {} attachments ({} bytes)", WorkerId, AddedChunks.size(), AddedBytes); } bool ActionRecorder::RecordAction(Ref Action) { bool AllGood = true; Action->ActionObj.IterateAttachments([&](CbFieldView Field) { IoHash AttachData = Field.AsHash(); IoBuffer ChunkData = m_ChunkResolver.FindChunkByCid(AttachData); if (ChunkData) { if (ChunkData.GetContentType() == ZenContentType::kCompressedBinary) { IoHash DecompressedHash; uint64_t RawSize = 0; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), /* out */ DecompressedHash, /* out*/ RawSize); OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize = 0; if (Compressed.TryGetCompressParameters(/* out */ Compressor, /* out */ CompressionLevel, /* out */ BlockSize)) { if (Compressor == OodleCompressor::NotSet) { CompositeBuffer Decompressed = Compressed.DecompressToComposite(); CompressedBuffer NewCompressed = CompressedBuffer::Compress(std::move(Decompressed), OodleCompressor::Mermaid, OodleCompressionLevel::Fast, BlockSize); ChunkData = NewCompressed.GetCompressed().Flatten().AsIoBuffer(); } } } const uint64_t ChunkSize = ChunkData.GetSize(); m_CidStore.AddChunk(ChunkData, AttachData, CidStore::InsertMode::kCopyOnly); ++m_ChunkCounter; m_ChunkBytesCounter.fetch_add(ChunkSize); } else { AllGood = false; ZEN_WARN("could not resolve chunk {}", AttachData); } }); if (AllGood) { m_ActionsFile.AppendObject(Action->ActionObj, Action->ActionId); ++m_ActionsCounter; return true; } else { return false; } } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES