diff options
| author | Dan Engelbrecht <[email protected]> | 2024-03-14 16:50:18 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-03-14 16:50:18 +0100 |
| commit | 0a935231009cb21680d364ef125f0296a5a5bed6 (patch) | |
| tree | 7e55a67ae60883b0eab71a0d636aeec23f307d14 /src | |
| parent | clean up test linking (#4) (diff) | |
| download | zen-0a935231009cb21680d364ef125f0296a5a5bed6.tar.xz zen-0a935231009cb21680d364ef125f0296a5a5bed6.zip | |
special treatment large oplog attachments v2 (#5)
- Bugfix: Install Ctrl+C handler earlier when doing `zen oplog-export` and `zen oplog-export` to properly cancel jobs
- Improvement: Add ability to block a set of CAS entries from GC in project store
- Improvement: Large attachments and loose files are now split into smaller chunks and stored in blocks during oplog export
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 18 | ||||
| -rw-r--r-- | src/zencore/blake3.cpp | 7 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 30 | ||||
| -rw-r--r-- | src/zencore/include/zencore/compress.h | 10 | ||||
| -rw-r--r-- | src/zencore/iobuffer.cpp | 7 | ||||
| -rw-r--r-- | src/zencore/iohash.cpp | 7 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 255 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 19 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 1652 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 23 | ||||
| -rw-r--r-- | src/zenstore/chunkedfile.cpp | 505 | ||||
| -rw-r--r-- | src/zenstore/chunking.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/chunkedfile.h | 54 | ||||
| -rw-r--r-- | src/zenutil/basicfile.cpp | 12 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/basicfile.h | 3 |
15 files changed, 1950 insertions, 654 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 2132d428d..e1ee31aaf 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -57,11 +57,14 @@ namespace { void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload) { + signal(SIGINT, SignalCallbackHandler); +#if ZEN_PLATFORM_WINDOWS + signal(SIGBREAK, SignalCallbackHandler); +#endif // ZEN_PLATFORM_WINDOWS if (HttpClient::Response Result = Http.Post(Url, Payload)) { if (Result.StatusCode == HttpResponseCode::Accepted) { - signal(SIGINT, SignalCallbackHandler); bool Cancelled = false; std::string_view JobIdText = Result.AsText(); @@ -136,10 +139,17 @@ namespace { double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS); } - uint32_t AbortCounter = SignalCounter[SIGINT].load(); - if (SignalCounter[SIGINT] > 0) + uint32_t InterruptCounter = SignalCounter[SIGINT].load(); + uint32_t BreakCounter = 0; +#if ZEN_PLATFORM_WINDOWS + BreakCounter = SignalCounter[SIGBREAK].load(); +#endif // ZEN_PLATFORM_WINDOWS + if (InterruptCounter > 0 || BreakCounter > 0) { - SignalCounter[SIGINT].fetch_sub(AbortCounter); + SignalCounter[SIGINT].fetch_sub(InterruptCounter); +#if ZEN_PLATFORM_WINDOWS + SignalCounter[SIGBREAK].fetch_sub(BreakCounter); +#endif // ZEN_PLATFORM_WINDOWS if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId))) { ZEN_CONSOLE("Requested cancel..."); diff --git a/src/zencore/blake3.cpp b/src/zencore/blake3.cpp index bdbc8fb3e..e4edff227 100644 --- a/src/zencore/blake3.cpp +++ b/src/zencore/blake3.cpp @@ -45,14 +45,15 @@ BLAKE3::HashBuffer(const CompositeBuffer& Buffer) for (const SharedBuffer& Segment : Buffer.GetSegments()) { - size_t SegmentSize = Segment.GetSize(); - if (SegmentSize >= (65536 + 32768) && Segment.IsFileReference()) + size_t SegmentSize = Segment.GetSize(); + static const size_t BufferingSize = 512 * 1024; + if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.IsFileReference()) { const IoBuffer SegmentBuffer = Segment.AsIoBuffer(); size_t Offset = 0; while (Offset < SegmentSize) { - size_t ChunkSize = Min<size_t>(SegmentSize - Offset, 65536u); + size_t ChunkSize = Min<size_t>(SegmentSize - Offset, BufferingSize); IoBuffer SubRange(SegmentBuffer, Offset, ChunkSize); blake3_hasher_update(&Hasher, SubRange.GetData(), ChunkSize); Offset += ChunkSize; diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index c41bdac42..a8e8a79f4 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -193,15 +193,7 @@ class NoneEncoder final : public BaseEncoder public: [[nodiscard]] CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final { - BufferHeader Header; - Header.Method = CompressionMethod::None; - Header.BlockCount = 1; - Header.TotalRawSize = RawData.GetSize(); - Header.TotalCompressedSize = Header.TotalRawSize + sizeof(BufferHeader); - Header.RawHash = BLAKE3::HashBuffer(RawData); - - UniqueBuffer HeaderData = UniqueBuffer::Alloc(sizeof(BufferHeader)); - Header.Write(HeaderData); + UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned()); } }; @@ -1301,6 +1293,26 @@ CompressedBuffer::ValidateCompressedHeader(const IoBuffer& CompressedData, IoHas return detail::BufferHeader::IsValid(SharedBuffer(CompressedData), OutRawHash, OutRawSize); } +size_t +CompressedBuffer::GetHeaderSizeForNoneEncoder() +{ + return sizeof(detail::BufferHeader); +} + +UniqueBuffer +CompressedBuffer::CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash) +{ + detail::BufferHeader Header; + Header.Method = detail::CompressionMethod::None; + Header.BlockCount = 1; + Header.TotalRawSize = RawSize; + Header.TotalCompressedSize = Header.TotalRawSize + sizeof(detail::BufferHeader); + Header.RawHash = RawHash; + UniqueBuffer HeaderData = UniqueBuffer::Alloc(sizeof(detail::BufferHeader)); + Header.Write(HeaderData); + return HeaderData; +} + uint64_t CompressedBuffer::DecodeRawSize() const { diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h index 44431f299..c51b5407f 100644 --- a/src/zencore/include/zencore/compress.h +++ b/src/zencore/include/zencore/compress.h @@ -94,10 +94,12 @@ public: uint64_t& OutRawSize); [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(IoBuffer&& CompressedData); [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(CompositeBuffer&& CompressedData); - [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize); - [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData, - IoHash& OutRawHash, - uint64_t& OutRawSize); + [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize); + [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize); + [[nodiscard]] ZENCORE_API static size_t GetHeaderSizeForNoneEncoder(); + [[nodiscard]] ZENCORE_API static UniqueBuffer CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash); /** Reset this to null. */ inline void Reset() { CompressedData.Reset(); } diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp index c8bc4a629..96a893082 100644 --- a/src/zencore/iobuffer.cpp +++ b/src/zencore/iobuffer.cpp @@ -704,8 +704,9 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) IoHash HashBuffer(IoBuffer& Buffer) { - size_t BufferSize = Buffer.Size(); - if (BufferSize >= (65536 + 32768)) + size_t BufferSize = Buffer.Size(); + static const size_t BufferingSize = 512 * 1024; + if (BufferSize >= (BufferingSize + BufferingSize / 2)) { IoBufferFileReference _; if (Buffer.GetFileReference(/* out */ _)) @@ -714,7 +715,7 @@ HashBuffer(IoBuffer& Buffer) IoHashStream HashStream; while (Offset < BufferSize) { - size_t ChunkSize = Min<size_t>(BufferSize - Offset, 65536u); + size_t ChunkSize = Min<size_t>(BufferSize - Offset, BufferingSize); IoBuffer SubRange(Buffer, Offset, ChunkSize); HashStream.Append(SubRange.GetData(), SubRange.GetSize()); Offset += ChunkSize; diff --git a/src/zencore/iohash.cpp b/src/zencore/iohash.cpp index cedee913a..a6bf25f6c 100644 --- a/src/zencore/iohash.cpp +++ b/src/zencore/iohash.cpp @@ -31,14 +31,15 @@ IoHash::HashBuffer(const CompositeBuffer& Buffer) for (const SharedBuffer& Segment : Buffer.GetSegments()) { - size_t SegmentSize = Segment.GetSize(); - if (SegmentSize >= (65536 + 32768) && Segment.IsFileReference()) + size_t SegmentSize = Segment.GetSize(); + static const size_t BufferingSize = 512 * 1024; + if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.IsFileReference()) { const IoBuffer SegmentBuffer = Segment.AsIoBuffer(); size_t Offset = 0; while (Offset < SegmentSize) { - size_t ChunkSize = Min<size_t>(SegmentSize - Offset, 65536u); + size_t ChunkSize = Min<size_t>(SegmentSize - Offset, BufferingSize); IoBuffer SubRange(SegmentBuffer, Offset, ChunkSize); Hasher.Append(SubRange.GetData(), ChunkSize); Offset += ChunkSize; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 47f8b7357..cfa53c080 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -986,6 +986,17 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler) m_Storage->ReplayLogEntries(Entries, [&](CbObjectView Op) { Handler(Op); }); } +size_t +ProjectStore::Oplog::GetOplogEntryCount() const +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return 0; + } + return m_LatestOpMap.size(); +} + void ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Handler) { @@ -1122,6 +1133,79 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid: } void +ProjectStore::Oplog::CaptureUpdatedLSN(RwLock::ExclusiveLockScope&, uint32_t LSN) +{ + if (m_UpdatedLSNs) + { + m_UpdatedLSNs->push_back(LSN); + } +} + +void +ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes) +{ + m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() { + if (m_NonGCAttachments) + { + m_NonGCAttachments->reserve(m_NonGCAttachments->size() + AttachmentHashes.size()); + m_NonGCAttachments->insert(m_NonGCAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); + } + }); +} + +void +ProjectStore::Oplog::EnableUpdateCapture() +{ + m_OplogLock.WithExclusiveLock([&]() { + m_UpdatedLSNs = std::make_unique<std::vector<int>>(); + m_NonGCAttachments = std::make_unique<std::vector<IoHash>>(); + }); +} + +void +ProjectStore::Oplog::DisableUpdateCapture() +{ + m_OplogLock.WithExclusiveLock([&]() { + m_UpdatedLSNs.reset(); + m_NonGCAttachments.reset(); + }); +} + +void +ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function<bool(const CbObjectView& UpdateOp)>&& Callback) +{ + if (m_UpdatedLSNs) + { + for (int UpdatedLSN : *m_UpdatedLSNs) + { + std::optional<CbObject> UpdatedOp = GetOpByIndex(UpdatedLSN); + if (UpdatedOp) + { + if (!Callback(UpdatedOp.value())) + { + break; + } + } + } + } +} + +void +ProjectStore::Oplog::IterateAddedAttachments(RwLock::SharedLockScope&, std::function<bool(const IoHash& RawHash)>&& Callback) +{ + if (m_NonGCAttachments) + { + for (const IoHash& ReferenceHash : *m_NonGCAttachments) + { + if (!Callback(ReferenceHash)) + { + break; + } + } + } +} + +void ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, const Oid& FileId, const IoHash& Hash, @@ -1279,10 +1363,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; - if (m_UpdatedLSNs) - { - m_UpdatedLSNs->push_back(OpEntry.OpLsn); - } + CaptureUpdatedLSN(OplogLock, OpEntry.OpLsn); return OpEntry.OpLsn; } @@ -2803,8 +2884,10 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie Attachments.insert(RawHash); }; + auto OnChunkedAttachment = [](const ChunkedInfo&) {}; + RemoteProjectStore::Result RemoteResult = - SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr); + SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, nullptr); if (RemoteResult.ErrorCode) { @@ -2865,6 +2948,15 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, } } + size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit; + if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + ChunkFileSizeLimit = Value.value(); + } + } + CidStore& ChunkStore = m_CidStore; RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( @@ -2873,11 +2965,12 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, *Oplog, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, /* BuildBlocks */ false, - /* IgnoreMissingAttachemnts */ false, - [](CompressedBuffer&&, const IoHash) {}, - [](const IoHash&, const TGetAttachmentBufferFunc&) {}, - [](const std::unordered_set<IoHash, IoHash::Hasher>) {}, + /* IgnoreMissingAttachments */ false, + [](CompressedBuffer&&, const IoHash&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, /* EmbedLooseFiles*/ false); OutResponse = std::move(ContainerResult.ContainerObject); @@ -3239,6 +3332,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); + size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); @@ -3267,6 +3361,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, EmbedLooseFile, Force, IgnoreMissingAttachments](JobContext& Context) { @@ -3276,6 +3371,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op *OplogPtr, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, EmbedLooseFile, Force, IgnoreMissingAttachments, @@ -3573,7 +3669,7 @@ public: virtual ~ProjectStoreReferenceChecker() { m_OplogLock.reset(); - m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs.reset(); }); + m_Oplog.DisableUpdateCapture(); } virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); } @@ -3598,7 +3694,7 @@ public: m_Oplog.OplogId()); }); - m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs = std::make_unique<std::vector<int>>(); }); + m_Oplog.EnableUpdateCapture(); RwLock::SharedLockScope __(m_Oplog.m_OplogLock); if (Ctx.IsCancelledFlag) @@ -3608,7 +3704,6 @@ public: m_Oplog.IterateOplog([&](CbObjectView Op) { Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); }); - m_PreCachedLsn = m_Oplog.GetMaxOpIndex(); } } @@ -3632,18 +3727,10 @@ public: m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock); - if (m_Oplog.m_UpdatedLSNs) - { - for (int UpdatedLSN : *m_Oplog.m_UpdatedLSNs) - { - std::optional<CbObject> UpdatedOp = m_Oplog.GetOpByIndex(UpdatedLSN); - if (UpdatedOp) - { - CbObjectView Op = UpdatedOp.value(); - Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); - } - } - } + m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool { + UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + return true; + }); } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override @@ -3676,12 +3763,21 @@ public: } } } + m_Oplog.IterateAddedAttachments(*m_OplogLock, [&](const IoHash& RawHash) -> bool { + if (IoCids.erase(RawHash) == 1) + { + if (IoCids.empty()) + { + return false; + } + } + return true; + }); } ProjectStore::Oplog& m_Oplog; bool m_PreCache; std::unique_ptr<RwLock::SharedLockScope> m_OplogLock; std::vector<IoHash> m_References; - int m_PreCachedLsn = -1; }; std::vector<GcReferenceChecker*> @@ -3792,13 +3888,16 @@ namespace testutils { return Package; }; - std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) + std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( + const std::span<const size_t>& Sizes, + OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast) { std::vector<std::pair<Oid, CompressedBuffer>> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size))); + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel); Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); } return Result; @@ -3890,6 +3989,99 @@ TEST_CASE("project.store.lifetimes") CHECK(Project->Identifier == "proj1"sv); } +struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse +{ + static const bool ForceDisableBlocks = true; + static const bool ForceEnableTempBlocks = false; +}; + +struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse +{ + static const bool ForceDisableBlocks = false; + static const bool ForceEnableTempBlocks = false; +}; + +struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue +{ + static const bool ForceDisableBlocks = false; + static const bool ForceEnableTempBlocks = true; +}; + +TEST_CASE_TEMPLATE("project.store.export", + Settings, + ExportForceDisableBlocksTrue_ForceTempBlocksFalse, + ExportForceDisableBlocksFalse_ForceTempBlocksFalse, + ExportForceDisableBlocksFalse_ForceTempBlocksTrue) +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + auto JobQueue = MakeJobQueue(1, ""sv); + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + Oplog->AppendNewOplogEntry( + CreateOplogPackage(Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); + + FileRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u}, + /*.FolderPath = */ ExportDir.Path(), + /*.Name = */ std::string("oplog1"), + /*OptionalBaseName = */ std::string(), + /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks, + /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks}; + std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, + *RemoteStore, + *Project.Get(), + *Oplog, + Options.MaxBlockSize, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + true, + false, + false, + nullptr); + + CHECK(ExportResult.ErrorCode == 0); + + ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {}); + CHECK(OplogImport != nullptr); + RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, false, false, nullptr); + CHECK(ImportResult.ErrorCode == 0); + + RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, true, false, nullptr); + CHECK(ImportForceResult.ErrorCode == 0); +} + TEST_CASE("project.store.gc") { using namespace std::literals; @@ -4284,12 +4476,15 @@ TEST_CASE("project.store.block") 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); - std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); - std::vector<SharedBuffer> Chunks; + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); + std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks; Chunks.reserve(AttachmentSizes.size()); for (const auto& It : AttachmentsWithId) { - Chunks.push_back(It.second.GetCompressed().Flatten()); + Chunks.push_back(std::make_pair(It.second.DecodeRawHash(), + [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer { + return CompositeBuffer(SharedBuffer(Buffer)); + })); } CompressedBuffer Block = GenerateBlock(std::move(Chunks)); IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 897231a2e..d8c053649 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -91,9 +91,11 @@ public: std::vector<ChunkInfo> GetAllChunksInfo(); void IterateChunkMap(std::function<void(const Oid&, const IoHash& Hash)>&& Fn); - void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); - void IterateOplog(std::function<void(CbObjectView)>&& Fn); - void IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Fn); + void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); + void IterateOplog(std::function<void(CbObjectView)>&& Fn); + void IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Fn); + size_t GetOplogEntryCount() const; + std::optional<CbObject> GetOpByKey(const Oid& Key); std::optional<CbObject> GetOpByIndex(int Index); int GetOpIndexByKey(const Oid& Key); @@ -140,6 +142,14 @@ public: void AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid::Hasher>& ChunkMappings); + void CaptureUpdatedLSN(RwLock::ExclusiveLockScope& OplogLock, uint32_t LSN); + void CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes); + + void EnableUpdateCapture(); + void DisableUpdateCapture(); + void IterateUpdatedLSNs(RwLock::SharedLockScope& OplogLock, std::function<bool(const CbObjectView& UpdateOp)>&& Callback); + void IterateAddedAttachments(RwLock::SharedLockScope& OplogLock, std::function<bool(const IoHash& RawHash)>&& Callback); + private: struct FileMapEntry { @@ -164,7 +174,8 @@ public: tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key - std::unique_ptr<std::vector<int>> m_UpdatedLSNs; + std::unique_ptr<std::vector<int>> m_UpdatedLSNs; + std::unique_ptr<std::vector<IoHash>> m_NonGCAttachments; RefPtr<OplogStorage> m_Storage; std::string m_OplogId; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 672292290..ce3411114 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -12,6 +12,7 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> +#include <zenstore/chunkedfile.h> #include <zenstore/cidstore.h> #include <zenutil/workerpools.h> @@ -38,6 +39,14 @@ namespace zen { CbArray("chunks") // Optional, only if we are not creating blocks (Zen) CbFieldType::BinaryAttachment // Chunk attachment hashes + CbArray("chunkedfiles"); + CbFieldType::Hash "rawhash" + CbFieldType::Integer "rawsize" + CbArray("chunks"); + CbFieldType::Hash "chunkhash" + CbArray("sequence"); + CbFieldType::Integer chunks index + CompressedBinary ChunkBlock { VarUInt ChunkCount @@ -143,30 +152,36 @@ IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& C }; CompressedBuffer -GenerateBlock(std::vector<SharedBuffer>&& Chunks) +GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) { - size_t ChunkCount = Chunks.size(); - SharedBuffer SizeBuffer; + std::vector<SharedBuffer> ChunkSegments; + ChunkSegments.resize(1); + ChunkSegments.reserve(1 + FetchChunks.size()); + size_t ChunkCount = FetchChunks.size(); { IoBuffer TempBuffer(ChunkCount * 9); MutableMemoryView View = TempBuffer.GetMutableView(); uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); uint8_t* BufferEndPtr = BufferStartPtr; BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); - auto It = Chunks.begin(); - while (It != Chunks.end()) + for (const auto& It : FetchChunks) { - BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr); - It++; + CompositeBuffer Chunk = It.second(It.first); + uint64_t ChunkSize = 0; + std::span<const SharedBuffer> Segments = Chunk.GetSegments(); + for (const SharedBuffer& Segment : Segments) + { + ChunkSize += Segment.GetSize(); + ChunkSegments.push_back(Segment); + } + BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); } ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); - SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); + ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); } CompressedBuffer CompressedBlock = - CompressedBuffer::Compress(CompositeBuffer(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))), - OodleCompressor::Mermaid, - OodleCompressionLevel::None); + CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); return CompressedBlock; } @@ -180,7 +195,7 @@ struct Block void CreateBlock(WorkerThreadPool& WorkerPool, Latch& OpSectionsLatch, - std::vector<SharedBuffer>&& ChunksInBlock, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, RwLock& SectionsLock, std::vector<Block>& Blocks, size_t BlockIndex, @@ -251,12 +266,16 @@ WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path) RetriesLeft--; return true; }); - uint64_t Offset = 0; - for (const SharedBuffer& Buffer : CompressedBuffer.GetCompressed().GetSegments()) { - BlockFile.Write(Buffer.GetView(), Offset); - Offset += Buffer.GetSize(); + CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed(); + BasicFileWriter BlockWriter(BlockFile, 64u * 1024u); + for (const SharedBuffer& Segment : Compressed.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + BlockWriter.Write(Segment.GetData(), SegmentSize, Offset); + Offset += SegmentSize; + } } void* FileHandle = BlockFile.Detach(); BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); @@ -270,13 +289,14 @@ BuildContainer(CidStore& ChunkStore, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, bool BuildBlocks, bool IgnoreMissingAttachments, const std::vector<Block>& KnownBlocks, WorkerThreadPool& WorkerPool, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, JobContext* OptionalContext, AsyncRemoteResult& RemoteResult) @@ -287,22 +307,24 @@ BuildContainer(CidStore& ChunkStore, CbObject OplogContainerObject; { - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseAttachments; + struct FoundAttachment + { + std::filesystem::path RawPath; // If not stored in cid + uint64_t Size = 0; + Oid Key = Oid::Zero; + }; + + std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; RwLock BlocksLock; std::vector<Block> Blocks; CompressedBuffer OpsBuffer; - std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; - - std::unordered_map<IoHash, int, IoHash::Hasher> Attachments; - std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); CreateDirectories(AttachmentTempPath); - auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function<void(CbObjectView)>& CB) { + auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) { bool OpRewritten = false; CbArrayView Files = Op["files"sv].AsArrayView(); if (Files.Num() == 0) @@ -316,6 +338,15 @@ BuildContainer(CidStore& ChunkStore, for (CbFieldView& Field : Files) { + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + CB(Op); + return; + } + bool CopyField = true; if (CbObjectView View = Field.AsObjectView()) @@ -344,73 +375,35 @@ BuildContainer(CidStore& ChunkStore, throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath)); } } - SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); - // Loose file, just hash it for now and leave compression for later via callback - - const uint64_t RawSize = DataBuffer.GetSize(); - if (RawSize > MaxChunkEmbedSize) - { - IoHashStream Hasher; - CompositeBuffer RawData(DataBuffer); - UniqueBuffer RawBlockCopy; - CompositeBuffer::Iterator It = RawData.GetIterator(0); - const uint64_t BlockSize = MaxChunkEmbedSize; - for (uint64_t RawOffset = 0; RawOffset < RawSize;) - { - const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize); - const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy); - Hasher.Append(RawBlock); - RawOffset += RawBlockSize; - } - DataHash = Hasher.GetHash(); - LargeChunkHashes.insert(DataHash); - } - else { - DataHash = IoHash::HashBuffer(DataBuffer); + Stopwatch HashTimer; + SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); + DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer)); + ZEN_INFO("Hashed loose file '{}' {}: {} in {}", + FilePath, + NiceBytes(DataBuffer.GetSize()), + DataHash, + NiceTimeSpanMs(HashTimer.GetElapsedTimeMs())); } - LooseAttachments.insert_or_assign( - DataHash, - [AttachmentBuffer = std::move(DataBuffer), &Oplog, AttachmentTempPath](const IoHash& DataHash) -> IoBuffer { - Stopwatch AttachmentTimer; - uint64_t RawSize = AttachmentBuffer.GetSize(); - CompressedBuffer Compressed = - CompressedBuffer::Compress(AttachmentBuffer, OodleCompressor::Mermaid, OodleCompressionLevel::Normal); - ZEN_ASSERT(Compressed.DecodeRawHash() == DataHash); - uint64_t PayloadSize = Compressed.GetCompressedSize(); - ZEN_INFO("Compressed loose file attachment {} ({} -> {}) in {}", - DataHash, - NiceBytes(RawSize), - NiceBytes(PayloadSize), - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentTimer.GetElapsedTimeMs()))); - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(DataHash.ToHexString()); - - IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath); - ZEN_INFO("Saved temp attachment to '{}', {}", AttachmentPath, NiceBytes(TempAttachmentBuffer.GetSize())); - return TempAttachmentBuffer; - }); + // Rewrite file array entry with new data reference + CbObjectWriter Writer; + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, DataHash); + UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key}); + + CbObject RewrittenOp = Writer.Save(); + Cbo.AddObject(std::move(RewrittenOp)); + CopyField = false; } - - // Rewrite file array entry with new data reference - CbObjectWriter Writer; - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; - } - return false; - }); - Writer.AddBinaryAttachment("data"sv, DataHash); - - CbObject RewrittenOp = Writer.Save(); - Cbo.AddObject(std::move(RewrittenOp)); - CopyField = false; - - Attachments.insert_or_assign(DataHash, LSN); } if (CopyField) @@ -449,24 +442,24 @@ BuildContainer(CidStore& ChunkStore, Stopwatch Timer; - tsl::robin_map<int, std::string> OpLSNToKey; - CompressedBuffer CompressedOpsSection; + size_t TotalOpCount = Oplog.GetOplogEntryCount(); + CompressedBuffer CompressedOpsSection; { + Stopwatch RewriteOplogTimer; CbObjectWriter SectionOpsWriter; SectionOpsWriter.BeginArray("ops"sv); { - Stopwatch RewriteOplogTimer; - Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) { + Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) { if (RemoteResult.IsError()) { return; } - std::string_view Key = Op["key"sv].AsString(); - OpLSNToKey.insert({LSN, std::string(Key)}); - Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); + Op.IterateAttachments([&](CbFieldView FieldView) { + UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key}); + }); if (EmbedLooseFiles) { - RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); + RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); } else { @@ -476,30 +469,42 @@ BuildContainer(CidStore& ChunkStore, if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } - if (OpCount % 100000 == 0) + if (OpCount % 1000 == 0) { - ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount)); + ReportProgress(OptionalContext, + fmt::format("Building oplog: {} ops processed", OpCount), + TotalOpCount, + TotalOpCount - OpCount); } }); + if (RemoteResult.IsError()) + { + return {}; + } if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } - ReportMessage(OptionalContext, - fmt::format("Rewrote {} ops to new oplog in {}", - OpCount, - NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); + ReportProgress(OptionalContext, fmt::format("Building oplog: {} ops processed", OpCount), TotalOpCount, 0); } SectionOpsWriter.EndArray(); // "ops" + ReportMessage(OptionalContext, + fmt::format("Rewrote {} ops to new oplog in {}", + OpCount, + NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); + { Stopwatch CompressOpsTimer; - CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), - OodleCompressor::Mermaid, - OodleCompressionLevel::Normal); + CompressedOpsSection = + CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast); ReportMessage(OptionalContext, fmt::format("Compressed oplog section {} ({} -> {}) in {}", CompressedOpsSection.DecodeRawHash(), @@ -512,357 +517,706 @@ BuildContainer(CidStore& ChunkStore, if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } - if (!Attachments.empty() && !KnownBlocks.empty()) - { - ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size())); - Stopwatch ReuseTimer; - - size_t SkippedAttachmentCount = 0; - for (const Block& KnownBlock : KnownBlocks) + auto FindReuseBlocks = [](const std::vector<Block>& KnownBlocks, + const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, + JobContext* OptionalContext) -> std::vector<size_t> { + std::vector<size_t> ReuseBlockIndexes; + if (!Attachments.empty() && !KnownBlocks.empty()) { - size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); - if (BlockAttachmentCount == 0) - { - continue; - } - size_t FoundAttachmentCount = 0; - for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) + ReportMessage( + OptionalContext, + fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size())); + Stopwatch ReuseTimer; + + for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { - if (Attachments.contains(KnownHash)) + const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); + if (BlockAttachmentCount == 0) { - FoundAttachmentCount++; + continue; } - } - - size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount; - // TODO: Configure reuse-level - if (ReusePercent > 80) - { - ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%", - KnownBlock.BlockHash, - FoundAttachmentCount, - ReusePercent); + size_t FoundAttachmentCount = 0; for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { - Attachments.erase(KnownHash); - SkippedAttachmentCount++; + if (Attachments.contains(KnownHash)) + { + FoundAttachmentCount++; + } } - Blocks.push_back(KnownBlock); - } - else if (FoundAttachmentCount > 0) - { - ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%", - KnownBlock.BlockHash, - FoundAttachmentCount, - ReusePercent); + size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount; + // TODO: Configure reuse-level + if (ReusePercent > 80) + { + ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%", + KnownBlock.BlockHash, + FoundAttachmentCount, + ReusePercent); + ReuseBlockIndexes.push_back(KnownBlockIndex); + } + else if (FoundAttachmentCount > 0) + { + ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%", + KnownBlock.BlockHash, + FoundAttachmentCount, + ReusePercent); + } } } - ReportMessage(OptionalContext, - fmt::format("Reusing {} out of {} known blocks, skipping upload of {} attachments, completed in {}", - Blocks.size(), - KnownBlocks.size(), - SkippedAttachmentCount, - NiceTimeSpanMs(static_cast<uint64_t>(ReuseTimer.GetElapsedTimeMs())))); - } + return ReuseBlockIndexes; + }; - if (IsCancelled(OptionalContext)) + std::unordered_set<IoHash, IoHash::Hasher> FoundHashes; + FoundHashes.reserve(UploadAttachments.size()); + for (const auto& It : UploadAttachments) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - return {}; + FoundHashes.insert(It.first); } - ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size())); - - // Sort attachments so we get predictable blocks for the same oplog upload - std::vector<IoHash> SortedAttachments; + size_t ReusedAttachmentCount = 0; + std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); + for (size_t KnownBlockIndex : ReusedBlockIndexes) { - SortedAttachments.reserve(Attachments.size()); - for (const auto& It : Attachments) + const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { - SortedAttachments.push_back(It.first); + if (UploadAttachments.erase(KnownHash) == 1) + { + ReusedAttachmentCount++; + } } - std::sort(SortedAttachments.begin(), - SortedAttachments.end(), - [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) { - auto LhsLNSIt = Attachments.find(Lhs); - ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end()); - auto RhsLNSIt = Attachments.find(Rhs); - ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end()); - if (LhsLNSIt->second == RhsLNSIt->second) - { - return Lhs < Rhs; - } - auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second); - ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end()); - auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second); - ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end()); - return LhsKeyIt->second < RhsKeyIt->second; - }); } - if (IsCancelled(OptionalContext)) + struct ChunkedFile { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - return {}; - } - ReportMessage(OptionalContext, - fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments", - SortedAttachments.size(), - OpLSNToKey.size())); + IoBuffer Source; - for (const IoHash& AttachmentHash : LargeChunkHashes) - { - if (IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - return {}; - } + ChunkedInfoWithSource Chunked; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkLoookup; + }; + std::vector<ChunkedFile> ChunkedFiles; - if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end()) - { - OnLargeAttachment(AttachmentHash, std::move(It->second)); - LooseAttachments.erase(It); - } - else - { - OnLargeAttachment(AttachmentHash, - [&ChunkStore](const IoHash& AttachmentHash) { return ChunkStore.FindChunkByCid(AttachmentHash); }); - } - } - size_t LargeAttachmentCount = LargeChunkHashes.size(); - - Latch BlockCreateLatch(1); - size_t GeneratedBlockCount = 0; - size_t BlockSize = 0; - std::vector<SharedBuffer> ChunksInBlock; - int LastLSNOp = -1; - auto GetPayload = [&](const IoHash& AttachmentHash) { - if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end()) - { - IoBuffer Payload = It->second(AttachmentHash); - LooseAttachments.erase(It); - return Payload; - } - return ChunkStore.FindChunkByCid(AttachmentHash); + auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash, + IoBuffer& RawData, + const IoBufferFileReference& FileRef, + JobContext*) -> ChunkedFile { + ChunkedFile Chunked; + Stopwatch Timer; + + uint64_t Offset = FileRef.FileChunkOffset; + uint64_t Size = FileRef.FileChunkSize; + + BasicFile SourceFile; + SourceFile.Attach(FileRef.FileHandle); + auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); + + Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); + Chunked.Source = RawData; + + ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", + RawHash, + NiceBytes(Chunked.Chunked.Info.RawSize), + Chunked.Chunked.Info.ChunkHashes.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + return Chunked; }; - uint32_t ResolvedLargeCount = 0; - uint32_t ResolvedSmallCount = 0; - uint32_t ResolvedFailedCount = 0; - uint32_t ComposedBlocks = 0; + RwLock ResolveLock; + std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments; + std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; - uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); + ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); - for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end(); HashIt++) + Latch ResolveAttachmentsLatch(1); + for (auto& It : UploadAttachments) { if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - break; - } - if ((ResolvedLargeCount + ResolvedSmallCount) % 1000 == 0) - { - ReportProgress(OptionalContext, - fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled", - ResolvedLargeCount, - ResolvedSmallCount, - ComposedBlocks), - SortedAttachments.size(), - SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount)); - } - const IoHash& AttachmentHash(*HashIt); - if (LargeChunkHashes.contains(AttachmentHash)) - { - ResolvedLargeCount++; - continue; + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; } - IoBuffer Payload = GetPayload(AttachmentHash); - if (!Payload) - { - auto It = Attachments.find(AttachmentHash); - ZEN_ASSERT(It != Attachments.end()); - std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second); - ZEN_ASSERT(Op.has_value()); - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(AttachmentHash.ToHexString()); - Sb.Append("' for op: \n"); - Op.value().ToJson(Sb); - ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView())); - - if (IgnoreMissingAttachments) - { - ResolvedFailedCount++; - continue; - } - else + ResolveAttachmentsLatch.AddCount(1); + + WorkerPool.ScheduleWork([&ChunkStore, + UploadAttachment = &It.second, + RawHash = It.first, + &ResolveAttachmentsLatch, + &ResolveLock, + &ChunkedHashes, + &LargeChunkHashes, + &ChunkedUploadAttachments, + &LooseUploadAttachments, + &MissingHashes, + &OnLargeAttachment, + &AttachmentTempPath, + &ChunkFile, + &ChunkedFiles, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + &RemoteResult, + OptionalContext]() { + auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); + try { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) + if (IsCancelled(OptionalContext)) { - ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + return; + } + + if (!UploadAttachment->RawPath.empty()) + { + const std::filesystem::path& FilePath = UploadAttachment->RawPath; + IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); + if (RawData) + { + if (RawData.GetSize() > ChunkFileSizeLimit) + { + IoBufferFileReference FileRef; + (void)RawData.GetFileReference(FileRef); + + ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); + ResolveLock.WithExclusiveLock( + [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { + ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); + ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) + { + ChunkedHashes.insert(ChunkHash); + } + ChunkedFiles.emplace_back(std::move(Chunked)); + }); + } + else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) + { + // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't + // it will be a loose attachment instead of going into a block + OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { + size_t RawSize = RawData.GetSize(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + + std::filesystem::path AttachmentPath = AttachmentTempPath; + AttachmentPath.append(RawHash.ToHexString()); + + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath); + ZEN_INFO("Saved temp attachment to '{}', {} ({})", + AttachmentPath, + NiceBytes(RawSize), + NiceBytes(TempAttachmentBuffer.GetSize())); + return TempAttachmentBuffer; + }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + size_t RawSize = RawData.GetSize(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + + std::filesystem::path AttachmentPath = AttachmentTempPath; + AttachmentPath.append(RawHash.ToHexString()); + + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath); + ZEN_INFO("Saved temp attachment to '{}', {} ({})", + AttachmentPath, + NiceBytes(RawSize), + NiceBytes(TempAttachmentBuffer.GetSize())); + + if (Compressed.GetCompressedSize() > MaxChunkEmbedSize) + { + OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + UploadAttachment->Size = Compressed.GetCompressedSize(); + ResolveLock.WithExclusiveLock( + [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data)); + }); + } + } + } + else + { + ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); + } + } + else + { + IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); + if (Data) + { + auto GetForChunking = + [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { + if (Data.IsWholeFile()) + { + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); + if (Compressed) + { + if (VerifyRawSize > ChunkFileSizeLimit) + { + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + if (CompressionLevel == OodleCompressionLevel::None) + { + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + if (Decompressed) + { + std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); + if (Segments.size() == 1) + { + IoBuffer DecompressedData = Segments[0].AsIoBuffer(); + if (DecompressedData.GetFileReference(OutFileRef)) + { + return true; + } + } + } + } + } + } + } + } + return false; + }; + + IoBufferFileReference FileRef; + if (GetForChunking(ChunkFileSizeLimit, Data, FileRef)) + { + ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); + ResolveLock.WithExclusiveLock( + [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { + ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); + ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) + { + ChunkedHashes.insert(ChunkHash); + } + ChunkedFiles.emplace_back(std::move(Chunked)); + }); + } + else if (Data.GetSize() > MaxChunkEmbedSize) + { + OnLargeAttachment(RawHash, + [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + UploadAttachment->Size = Data.GetSize(); + } + } + else + { + ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); + } } - return {}; } - } + catch (std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to resolve attachment {}", RawHash), + Ex.what()); + } + }); + } + ResolveAttachmentsLatch.CountDown(); - uint64_t PayloadSize = Payload.GetSize(); - if (PayloadSize > MaxChunkEmbedSize) + while (!ResolveAttachmentsLatch.Wait(1000)) + { + ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); + if (IsCancelled(OptionalContext)) { - if (LargeChunkHashes.insert(AttachmentHash).second) + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + while (!ResolveAttachmentsLatch.Wait(1000)) { - OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); }); - LargeAttachmentCount++; + Remaining = ResolveAttachmentsLatch.Remaining(); + ReportProgress(OptionalContext, + fmt::format("Aborting, {} attachments remaining...", Remaining), + UploadAttachments.size(), + Remaining); } - ResolvedLargeCount++; - continue; + ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0); + return {}; } - else + ReportProgress(OptionalContext, + fmt::format("Resolving attachments, {} remaining...", Remaining), + UploadAttachments.size(), + Remaining); + } + if (UploadAttachments.size() > 0) + { + ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0); + } + + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } + + for (const IoHash& AttachmentHash : MissingHashes) + { + auto It = UploadAttachments.find(AttachmentHash); + ZEN_ASSERT(It != UploadAttachments.end()); + std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key); + ZEN_ASSERT(Op.has_value()); + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(AttachmentHash.ToHexString()); + Sb.Append("' for op: \n"); + Op.value().ToJson(Sb); + + if (IgnoreMissingAttachments) { - ResolvedSmallCount++; + ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView())); } - - if (!BlockAttachmentHashes.insert(AttachmentHash).second) + else { - continue; + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); + return {}; } + UploadAttachments.erase(AttachmentHash); + } - auto It = Attachments.find(AttachmentHash); - const int CurrentOpLSN = It->second; + for (const auto& It : ChunkedUploadAttachments) + { + UploadAttachments.erase(It.first); + } + for (const auto& It : LargeChunkHashes) + { + UploadAttachments.erase(It); + } - BlockSize += PayloadSize; - if (BuildBlocks) + std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); + for (size_t KnownBlockIndex : ReusedBlockIndexes) + { + const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { - ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); + if (ChunkedHashes.erase(KnownHash) == 1) + { + ReusedAttachmentCount++; + } } - else + } + + ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end()); + std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); + auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); + size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd); + if (ReuseBlockCount > 0) + { + Blocks.reserve(ReuseBlockCount); + for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++) { - Payload = {}; + Blocks.push_back(KnownBlocks[*It]); } + ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); + } + + std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments; + SortedUploadAttachments.reserve(UploadAttachments.size()); + for (const auto& It : UploadAttachments) + { + SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key)); + } + + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } + + ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); + + // Sort attachments so we get predictable blocks for the same oplog upload + std::sort(SortedUploadAttachments.begin(), + SortedUploadAttachments.end(), + [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) { + if (Lhs.second == Rhs.second) + { + // Same key, sort by raw hash + return Lhs.first < Rhs.first; + } + // Sort by key + return Lhs.second < Rhs.second; + }); + + std::vector<size_t> ChunkedFilesOrder; + ChunkedFilesOrder.reserve(ChunkedFiles.size()); + for (size_t Index = 0; Index < ChunkedFiles.size(); Index++) + { + ChunkedFilesOrder.push_back(Index); + } + std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) { + return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash; + }); + + // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded + // ChunkedHashes contains all chunked up chunks to be composed into blocks + + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", + SortedUploadAttachments.size(), + ChunkedHashes.size(), + TotalOpCount)); + + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } + + // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded + // ChunkedHashes contains all chunked up chunks to be composed into blocks + + size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size(); + size_t ChunksAssembled = 0; + ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); - if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp)) + Latch BlockCreateLatch(1); + size_t GeneratedBlockCount = 0; + size_t BlockSize = 0; + std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; + + Oid LastOpKey = Oid::Zero; + uint32_t ComposedBlocks = 0; + + uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); + try + { + uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); + std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; + auto NewBlock = [&]() { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + size_t ChunkCount = ChunksInBlock.size(); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + ComposedBlocks++; + } + else + { + ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); + OnBlockChunks(std::move(ChunksInBlock)); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + uint64_t NowMS = Timer.GetElapsedTimeMs(); + ZEN_INFO("Assembled block {} with {} chunks in {} ({})", + BlockIndex, + ChunkCount, + NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), + NiceBytes(BlockSize)); + FetchAttachmentsStartMS = NowMS; + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + GeneratedBlockCount++; + }; + + for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++) { - size_t BlockIndex = AddBlock(BlocksLock, Blocks); - size_t ChunkCount = ChunksInBlock.size(); - if (BuildBlocks) + if (IsCancelled(OptionalContext)) { - CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - ComposedBlocks++; + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + break; } - else + if (ChunksAssembled % 1000 == 0) { - ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); - OnBlockChunks(BlockAttachmentHashes); + ReportProgress( + OptionalContext, + fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + ChunkAssembleCount - ChunksAssembled); } + const IoHash& RawHash(HashIt->first); + const Oid CurrentOpKey = HashIt->second; + const IoHash& AttachmentHash(HashIt->first); + auto InfoIt = UploadAttachments.find(RawHash); + ZEN_ASSERT(InfoIt != UploadAttachments.end()); + uint64_t PayloadSize = InfoIt->second.Size; + + if (BlockAttachmentHashes.insert(AttachmentHash).second) { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); + if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) + { + ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) { + return CompositeBuffer(IoBuffer); + })); + LooseUploadAttachments.erase(It); + } + else + { + ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) { + return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); + })); + } + BlockSize += PayloadSize; + + if (BlockSize >= MaxBlockSize && (CurrentOpKey != LastOpKey)) + { + NewBlock(); + } + LastOpKey = CurrentOpKey; + ChunksAssembled++; } - uint64_t NowMS = Timer.GetElapsedTimeMs(); - ZEN_INFO("Assembled block {} with {} chunks in {} ({})", - BlockIndex, - ChunkCount, - NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), - NiceBytes(BlockSize)); - FetchAttachmentsStartMS = NowMS; - BlockAttachmentHashes.clear(); - ChunksInBlock.clear(); - BlockSize = 0; - GeneratedBlockCount++; } - LastLSNOp = CurrentOpLSN; - } - - if (BlockSize > 0) - { - if (!IsCancelled(OptionalContext)) + if (!RemoteResult.IsError()) { - size_t BlockIndex = AddBlock(BlocksLock, Blocks); - size_t ChunkCount = ChunksInBlock.size(); - if (BuildBlocks) + // Keep the chunked files as separate blocks to make the blocks generated + // more consistent + if (BlockSize > 0) { - CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - ComposedBlocks++; + NewBlock(); } - else + + for (size_t ChunkedFileIndex : ChunkedFilesOrder) { - ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); - OnBlockChunks(BlockAttachmentHashes); + const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; + const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; + size_t ChunkCount = Chunked.Info.ChunkHashes.size(); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) + { + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + break; + } + if (ChunksAssembled % 1000 == 0) + { + ReportProgress(OptionalContext, + fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", + ChunksAssembled, + ComposedBlocks), + ChunkAssembleCount, + ChunkAssembleCount - ChunksAssembled); + } + const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex]; + if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end()) + { + if (BlockAttachmentHashes.insert(ChunkHash).second) + { + const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; + ChunksInBlock.emplace_back(std::make_pair( + ChunkHash, + [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) { + return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None) + .GetCompressed(); + })); + BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; + if (BuildBlocks) + { + if (BlockSize >= MaxBlockSize) + { + NewBlock(); + } + } + ChunksAssembled++; + } + ChunkedHashes.erase(FindIt); + } + } } + } + + if (BlockSize > 0 && !RemoteResult.IsError()) + { + if (!IsCancelled(OptionalContext)) { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); + NewBlock(); } - uint64_t NowMS = Timer.GetElapsedTimeMs(); - ZEN_INFO("Assembled block {} with {} chunks in {} ({})", - BlockIndex, - ChunkCount, - NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), - NiceBytes(BlockSize)); - FetchAttachmentsStartMS = NowMS; - - BlockAttachmentHashes.clear(); - ChunksInBlock.clear(); - BlockSize = 0; - GeneratedBlockCount++; } - } - ReportProgress(OptionalContext, - fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled", - ResolvedLargeCount, - ResolvedSmallCount, - ComposedBlocks), - SortedAttachments.size(), - 0); - ReportMessage(OptionalContext, - fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", - SortedAttachments.size(), - OpLSNToKey.size(), - GeneratedBlockCount, - LargeAttachmentCount, - NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); + ReportProgress(OptionalContext, + fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + 0); - if (IsCancelled(OptionalContext)) + ReportMessage(OptionalContext, + fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}", + ChunkAssembleCount, + TotalOpCount, + GeneratedBlockCount, + NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); + + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + ReportProgress(OptionalContext, + fmt::format("Aborting, {} blocks remaining...", Remaining), + GeneratedBlockCount, + Remaining); + } + if (GeneratedBlockCount > 0) + { + ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0); + } + return {}; + } + } + catch (std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { - ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); } - if (GeneratedBlockCount > 0) - { - ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0); - } - return {}; + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what()); + throw; } BlockCreateLatch.CountDown(); @@ -872,6 +1226,7 @@ BuildContainer(CidStore& ChunkStore, if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); while (!BlockCreateLatch.Wait(1000)) { Remaining = BlockCreateLatch.Remaining(); @@ -885,9 +1240,13 @@ BuildContainer(CidStore& ChunkStore, } ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining); } + if (GeneratedBlockCount > 0) { + uint64_t NowMS = Timer.GetElapsedTimeMs(); ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); + ReportMessage(OptionalContext, + fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); } if (!RemoteResult.IsError()) @@ -937,6 +1296,35 @@ BuildContainer(CidStore& ChunkStore, } } OplogContinerWriter.EndArray(); // "blocks"sv + OplogContinerWriter.BeginArray("chunkedfiles"sv); + { + for (const ChunkedFile& F : ChunkedFiles) + { + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); + OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) + { + OplogContinerWriter.AddHash(RawHash); + } + } + OplogContinerWriter.EndArray(); // "chunks" + OplogContinerWriter.BeginArray("sequence"sv); + { + for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) + { + OplogContinerWriter.AddInteger(ChunkIndex); + } + } + OplogContinerWriter.EndArray(); // "sequence" + } + OplogContinerWriter.EndObject(); + } + } + OplogContinerWriter.EndArray(); // "chunkedfiles"sv OplogContinerWriter.BeginArray("chunks"sv); { @@ -959,11 +1347,12 @@ BuildContainer(CidStore& ChunkStore, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, bool BuildBlocks, bool IgnoreMissingAttachments, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) { WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); @@ -974,6 +1363,7 @@ BuildContainer(CidStore& ChunkStore, Oplog, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, BuildBlocks, IgnoreMissingAttachments, {}, @@ -1001,7 +1391,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, RemoteProjectStore& RemoteStore, const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, - const std::vector<std::vector<IoHash>>& BlockChunks, + const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks, const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments, const std::unordered_set<IoHash, IoHash::Hasher>& Needs, @@ -1019,11 +1409,12 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ReportMessage(OptionalContext, "Filtering needed attachments for upload..."); - std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload; + std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload; + std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload; - size_t BlockAttachmentCountToUpload = 0; - size_t LargeAttachmentCountToUpload = 0; - std::atomic<ptrdiff_t> BulkAttachmentCountToUpload = 0; + size_t BlockAttachmentCountToUpload = 0; + size_t LargeAttachmentCountToUpload = 0; + size_t BulkAttachmentCountToUpload = 0; AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); for (const auto& CreatedBlock : CreatedBlocks) @@ -1042,39 +1433,19 @@ UploadAttachments(WorkerThreadPool& WorkerPool, LargeAttachmentCountToUpload++; } } - for (const std::vector<IoHash>& BlockHashes : BlockChunks) + for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks) { - if (ForceAll) - { - AttachmentsToUpload.insert(BlockHashes.begin(), BlockHashes.end()); - BulkAttachmentCountToUpload += BlockHashes.size(); - continue; - } - for (const IoHash& Hash : BlockHashes) + for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes) { - if (Needs.contains(Hash)) + if (ForceAll || Needs.contains(Chunk.first)) { - AttachmentsToUpload.insert(Hash); + BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second)); BulkAttachmentCountToUpload++; } } } - for (const IoHash& Needed : Needs) - { - if (!AttachmentsToUpload.contains(Needed)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - "Invalid attachment", - fmt::format("Upload requested an unknown attachment '{}'", Needed)); - ReportMessage( - OptionalContext, - fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; - } - } - - if (AttachmentsToUpload.empty()) + if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty()) { ReportMessage(OptionalContext, "No attachments needed"); return; @@ -1085,30 +1456,29 @@ UploadAttachments(WorkerThreadPool& WorkerPool, if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } return; } ReportMessage(OptionalContext, fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", - AttachmentsToUpload.size(), + AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, - BulkAttachmentCountToUpload.load())); + BulkAttachmentCountToUpload)); + + Stopwatch Timer; ptrdiff_t AttachmentsToSave(0); Latch SaveAttachmentsLatch(1); - for (const IoHash& RawHash : LargeAttachments) + for (const IoHash& RawHash : AttachmentsToUpload) { if (RemoteResult.IsError()) { break; } - if (!AttachmentsToUpload.contains(RawHash)) - { - continue; - } SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; @@ -1126,10 +1496,12 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { return; } + bool IsBlock = false; IoBuffer Payload; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { Payload = BlockIt->second; + IsBlock = true; } else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) { @@ -1161,76 +1533,24 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.GetErrorReason())); return; } - Info.AttachmentsUploaded.fetch_add(1); - Info.AttachmentBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); - return; - }); - } - - if (IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - } - return; - } - - for (auto& It : CreatedBlocks) - { - if (RemoteResult.IsError()) - { - break; - } - const IoHash& RawHash = It.first; - if (!AttachmentsToUpload.contains(RawHash)) - { - continue; - } - IoBuffer Payload = It.second; - ZEN_ASSERT(Payload); - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, - &RemoteStore, - &SaveAttachmentsLatch, - &RemoteResult, - Payload = std::move(Payload), - RawHash, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) + if (IsBlock) { - return; + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); } - - size_t PayloadSize = Payload.GetSize(); - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); - if (Result.ErrorCode) + else { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(PayloadSize), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; + Info.AttachmentsUploaded.fetch_add(1); + Info.AttachmentBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); } - - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); - - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); return; }); } @@ -1240,80 +1560,85 @@ UploadAttachments(WorkerThreadPool& WorkerPool, if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } return; } - for (const std::vector<IoHash>& Chunks : BlockChunks) + if (!BulkBlockAttachmentsToUpload.empty()) { - if (RemoteResult.IsError()) - { - break; - } - - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(Chunks.size()); - for (const IoHash& Chunk : Chunks) + for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks) { - if (AttachmentsToUpload.contains(Chunk)) + if (RemoteResult.IsError()) { - NeededChunks.push_back(Chunk); + break; } - } - if (NeededChunks.empty()) - { - continue; - } - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, - &Chunks, - NeededChunks = std::move(NeededChunks), - &BulkAttachmentCountToUpload, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - size_t ChunksSize = 0; - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) + std::vector<IoHash> NeededChunks; + NeededChunks.reserve(Chunks.size()); + for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks) { - IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); - if (!ChunkPayload) + const IoHash& ChunkHash = Chunk.first; + if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash)) { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; + NeededChunks.push_back(Chunk.first); } - ChunksSize += ChunkPayload.GetSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) + if (NeededChunks.empty()) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): {}", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; + continue; } - Info.AttachmentsUploaded.fetch_add(NeededChunks.size()); - Info.AttachmentBytesUploaded.fetch_add(ChunksSize); - - ZEN_INFO("Saved {} bulk attachments in {} ({})", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(ChunksSize)); - BulkAttachmentCountToUpload.fetch_sub(Chunks.size()); - }); + + SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + NeededChunks = std::move(NeededChunks), + &BulkBlockAttachmentsToUpload, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + size_t ChunksSize = 0; + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + auto It = BulkBlockAttachmentsToUpload.find(Chunk); + ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); + CompositeBuffer ChunkPayload = It->second(It->first); + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunksSize += ChunkPayload.GetSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): {}", + NeededChunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } + Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); + Info.AttachmentBytesUploaded.fetch_add(ChunksSize); + + ZEN_INFO("Saved {} bulk attachments in {} ({})", + NeededChunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(ChunksSize)); + }); + } } SaveAttachmentsLatch.CountDown(); @@ -1325,18 +1650,22 @@ UploadAttachments(WorkerThreadPool& WorkerPool, if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } - ReportProgress( - OptionalContext, - fmt::format("Saving attachments, {} remaining...", BlockChunks.empty() ? Remaining : BulkAttachmentCountToUpload.load()), - AttachmentsToSave, - Remaining); + ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining); } if (AttachmentsToSave > 0) { ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); } + ReportMessage(OptionalContext, + fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}", + AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), + BlockAttachmentCountToUpload, + LargeAttachmentCountToUpload, + BulkAttachmentCountToUpload, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); } RemoteProjectStore::Result @@ -1346,6 +1675,7 @@ SaveOplog(CidStore& ChunkStore, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, bool EmbedLooseFiles, bool ForceUpload, bool IgnoreMissingAttachments, @@ -1409,8 +1739,8 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); }; - std::vector<std::vector<IoHash>> BlockChunks; - auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) { + std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>> BlockChunks; + auto OnBlockChunks = [&BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) { BlockChunks.push_back({Chunks.begin(), Chunks.end()}); ZEN_DEBUG("Found {} block chunks", Chunks.size()); }; @@ -1480,11 +1810,16 @@ SaveOplog(CidStore& ChunkStore, } } + // TODO: We need to check if remote store actually *has* all KnownBlocks + // We can't reconstruct known blocks on demand as they may contain chunks that we don't have + // and we don't care about :( + CbObject OplogContainerObject = BuildContainer(ChunkStore, Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, RemoteStoreInfo.CreateBlocks, IgnoreMissingAttachments, KnownBlocks, @@ -1504,6 +1839,7 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Text = "Operation cancelled"}; + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return Result; } @@ -1550,6 +1886,7 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Text = "Operation cancelled"}; + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text)); return Result; } @@ -1629,6 +1966,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, JobContext* OptionalContext) { using namespace std::literals; @@ -1657,10 +1995,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); + + std::vector<IoHash> NeededChunks; + NeededChunks.reserve(ChunksArray.Num()); if (BlockHash == IoHash::Zero) { - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(ChunksArray.GetSize()); for (CbFieldView ChunkField : ChunksArray) { IoHash ChunkHash = ChunkField.AsBinaryAttachment(); @@ -1670,27 +2009,55 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } NeededChunks.emplace_back(ChunkHash); } - - if (!NeededChunks.empty()) + } + else + { + for (CbFieldView ChunkField : ChunksArray) { - OnNeedBlock(IoHash::Zero, std::move(NeededChunks)); + const IoHash ChunkHash = ChunkField.AsHash(); + if (HasAttachment(ChunkHash)) + { + continue; + } + NeededChunks.emplace_back(ChunkHash); } - continue; } - for (CbFieldView ChunkField : ChunksArray) + if (!NeededChunks.empty()) { - IoHash ChunkHash = ChunkField.AsHash(); - if (HasAttachment(ChunkHash)) + OnNeedBlock(BlockHash, std::move(NeededChunks)); + if (BlockHash != IoHash::Zero) { - continue; + NeedBlockCount++; } + } + } - OnNeedBlock(BlockHash, {}); - NeedBlockCount++; - break; + CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); + for (CbFieldView ChunkedFileField : ChunkedFilesArray) + { + CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); + ChunkedInfo Chunked; + Chunked.RawHash = ChunkedFileView["rawhash"sv].AsHash(); + Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); + CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); + Chunked.ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) + { + const IoHash ChunkHash = ChunkField.AsHash(); + Chunked.ChunkHashes.emplace_back(ChunkHash); } - }; + CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); + Chunked.ChunkSequence.reserve(SequenceArray.Num()); + for (CbFieldView SequenceField : SequenceArray) + { + uint32_t SequenceIndex = SequenceField.AsUInt32(); + ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); + Chunked.ChunkSequence.push_back(SequenceIndex); + } + OnChunkedAttachment(Chunked); + } + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); @@ -1788,7 +2155,6 @@ LoadOplog(CidStore& ChunkStore, WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); std::unordered_set<IoHash, IoHash::Hasher> Attachments; - std::vector<std::vector<IoHash>> ChunksInBlocks; RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); @@ -1815,12 +2181,19 @@ LoadOplog(CidStore& ChunkStore, std::atomic_size_t AttachmentCount = 0; auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { - return !ForceDownload && ChunkStore.ContainsChunk(RawHash); + if (ForceDownload) + { + return false; + } + if (ChunkStore.ContainsChunk(RawHash)) + { + return true; + } + return false; }; auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, - &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult, @@ -1896,6 +2269,7 @@ LoadOplog(CidStore& ChunkStore, &RemoteStore, BlockHash, &RemoteResult, + Chunks = std::move(Chunks), &Info, IgnoreMissingAttachments, OptionalContext]() { @@ -1922,30 +2296,36 @@ LoadOplog(CidStore& ChunkStore, } return; } - Info.AttachmentBlocksDownloaded.fetch_add(1); + if (RemoteResult.IsError()) + { + return; + } uint64_t BlockSize = BlockResult.Bytes.GetSize(); + Info.AttachmentBlocksDownloaded.fetch_add(1); ZEN_INFO("Loaded block attachment '{}' in {} ({})", BlockHash, NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), NiceBytes(BlockSize)); - if (RemoteResult.IsError()) - { - return; - } Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - + std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; + WantedChunks.reserve(Chunks.size()); + WantedChunks.insert(Chunks.begin(), Chunks.end()); bool StoreChunksOK = - IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - uint64_t ChunkSize = Chunk.GetCompressedSize(); - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); - } - }); - + IterateBlock(std::move(BlockResult.Bytes), + [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + uint64_t ChunkSize = Chunk.GetCompressedSize(); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + WantedChunks.erase(AttachmentRawHash); + } + }); if (!StoreChunksOK) { ReportMessage(OptionalContext, @@ -1958,6 +2338,7 @@ LoadOplog(CidStore& ChunkStore, {}); return; } + ZEN_ASSERT(WantedChunks.empty()); }); }; @@ -2027,8 +2408,22 @@ LoadOplog(CidStore& ChunkStore, }); }; - RemoteProjectStore::Result Result = - SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); + std::vector<ChunkedInfo> FilesToDechunk; + auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { + if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) + { + Oplog.CaptureAddedAttachments(Chunked.ChunkHashes); + FilesToDechunk.push_back(Chunked); + } + }; + + RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, + LoadContainerResult.ContainerObject, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + OptionalContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); @@ -2057,8 +2452,101 @@ LoadOplog(CidStore& ChunkStore, } if (Result.ErrorCode == 0) { + if (!FilesToDechunk.empty()) + { + ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); + + Latch DechunkLatch(1); + std::filesystem::path TempFilePath = Oplog.TempPath(); + for (const ChunkedInfo& Chunked : FilesToDechunk) + { + std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); + DechunkLatch.AddCount(1); + WorkerPool.ScheduleWork([&ChunkStore, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, OptionalContext]() { + auto _ = MakeGuard([&DechunkLatch] { DechunkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + Stopwatch Timer; + IoBuffer TmpBuffer; + { + BasicFile TmpFile; + TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); + { + BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); + + uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + BLAKE3Stream HashingStream; + for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + { + const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; + IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); + if (!Chunk) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + ReportMessage(OptionalContext, + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + return; + } + CompositeBuffer Decompressed = + CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); + for (const SharedBuffer& Segment : Decompressed.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); + Offset += SegmentData.GetSize(); + } + } + BLAKE3 RawHash = HashingStream.GetHash(); + ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); + UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); + TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); + } + TmpFile.Flush(); + uint64_t TmpFileSize = TmpFile.FileSize(); + TmpBuffer = IoBuffer(IoBuffer::File, TmpFile.Detach(), 0, TmpFileSize, /*IsWholeFile*/ true); + IoHash ValidateRawHash; + uint64_t ValidateRawSize = 0; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(TmpBuffer, ValidateRawHash, ValidateRawSize)); + ZEN_ASSERT(ValidateRawHash == Chunked.RawHash); + ZEN_ASSERT(ValidateRawSize == Chunked.RawSize); + } + ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + ZEN_INFO("Dechunked attachment {} ({}) in {}", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + } + DechunkLatch.CountDown(); + + while (!DechunkLatch.Wait(1000)) + { + ptrdiff_t Remaining = DechunkLatch.Remaining(); + if (IsCancelled(OptionalContext)) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + } + } + ReportProgress(OptionalContext, + fmt::format("Dechunking attachments, {} remaining...", Remaining), + FilesToDechunk.size(), + Remaining); + } + ReportProgress(OptionalContext, fmt::format("Dechunking attachments, {} remaining...", 0), FilesToDechunk.size(), 0); + } Result = RemoteResult.ConvertResult(); } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ReportMessage(OptionalContext, diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 7254b9d3f..da93f0a27 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -11,6 +11,7 @@ namespace zen { class CidStore; class WorkerThreadPool; +struct ChunkedInfo; class RemoteProjectStore { @@ -84,14 +85,17 @@ public: struct RemoteStoreOptions { - static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; - static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; + static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; + static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; + static const size_t DefaultChunkFileSizeLimit = 256u * 1024u * 1024u; - size_t MaxBlockSize = DefaultMaxBlockSize; - size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; + size_t MaxBlockSize = DefaultMaxBlockSize; + size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; + size_t ChunkFileSizeLimit = DefaultChunkFileSizeLimit; }; typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; +typedef std::function<CompositeBuffer(const IoHash& RawHash)> FetchChunkFunc; RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, @@ -99,11 +103,12 @@ RemoteProjectStore::LoadContainerResult BuildContainer( ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, bool BuildBlocks, bool IgnoreMissingAttachments, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles); class JobContext; @@ -112,8 +117,9 @@ RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - JobContext* OptionalContext); + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment, + JobContext* OptionalContext); RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, @@ -121,6 +127,7 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, bool EmbedLooseFiles, bool ForceUpload, bool IgnoreMissingAttachments, @@ -133,7 +140,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, bool IgnoreMissingAttachments, JobContext* OptionalContext); -CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); +CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks); bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); } // namespace zen diff --git a/src/zenstore/chunkedfile.cpp b/src/zenstore/chunkedfile.cpp new file mode 100644 index 000000000..0b66c7b9b --- /dev/null +++ b/src/zenstore/chunkedfile.cpp @@ -0,0 +1,505 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/chunkedfile.h> +#include <zenutil/basicfile.h> + +#include "chunking.h" + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +namespace { + struct ChunkedHeader + { + static constexpr uint32_t ExpectedMagic = 0x646b6863; // chkd + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint32_t ChunkSequenceLength; + uint32_t ChunkHashCount; + uint64_t ChunkSequenceOffset; + uint64_t ChunkHashesOffset; + uint64_t RawSize = 0; + IoHash RawHash; + }; +} // namespace + +IoBuffer +SerializeChunkedInfo(const ChunkedInfo& Info) +{ + size_t HeaderSize = RoundUp(sizeof(ChunkedHeader), 16) + RoundUp(sizeof(uint32_t) * Info.ChunkSequence.size(), 16) + + RoundUp(sizeof(IoHash) * Info.ChunkHashes.size(), 16); + IoBuffer HeaderData(HeaderSize); + + ChunkedHeader Header; + Header.ChunkSequenceLength = gsl::narrow<uint32_t>(Info.ChunkSequence.size()); + Header.ChunkHashCount = gsl::narrow<uint32_t>(Info.ChunkHashes.size()); + Header.ChunkSequenceOffset = RoundUp(sizeof(ChunkedHeader), 16); + Header.ChunkHashesOffset = RoundUp(Header.ChunkSequenceOffset + sizeof(uint32_t) * Header.ChunkSequenceLength, 16); + Header.RawSize = Info.RawSize; + Header.RawHash = Info.RawHash; + + MutableMemoryView WriteView = HeaderData.GetMutableView(); + { + MutableMemoryView HeaderWriteView = WriteView.Left(sizeof(Header)); + HeaderWriteView.CopyFrom(MemoryView(&Header, sizeof(Header))); + } + { + MutableMemoryView ChunkSequenceWriteView = WriteView.Mid(Header.ChunkSequenceOffset, sizeof(uint32_t) * Header.ChunkSequenceLength); + ChunkSequenceWriteView.CopyFrom(MemoryView(Info.ChunkSequence.data(), ChunkSequenceWriteView.GetSize())); + } + { + MutableMemoryView ChunksWriteView = WriteView.Mid(Header.ChunkHashesOffset, sizeof(IoHash) * Header.ChunkHashCount); + ChunksWriteView.CopyFrom(MemoryView(Info.ChunkHashes.data(), ChunksWriteView.GetSize())); + } + + return HeaderData; +} + +ChunkedInfo +DeserializeChunkedInfo(IoBuffer& Buffer) +{ + MemoryView View = Buffer.GetView(); + ChunkedHeader Header; + { + MutableMemoryView HeaderWriteView(&Header, sizeof(Header)); + HeaderWriteView.CopyFrom(View.Left(sizeof(Header))); + } + if (Header.Magic != ChunkedHeader::ExpectedMagic) + { + return {}; + } + if (Header.Version != ChunkedHeader::CurrentVersion) + { + return {}; + } + ChunkedInfo Info; + Info.RawSize = Header.RawSize; + Info.RawHash = Header.RawHash; + Info.ChunkSequence.resize(Header.ChunkSequenceLength); + Info.ChunkHashes.resize(Header.ChunkHashCount); + { + MutableMemoryView ChunkSequenceWriteView(Info.ChunkSequence.data(), sizeof(uint32_t) * Header.ChunkSequenceLength); + ChunkSequenceWriteView.CopyFrom(View.Mid(Header.ChunkSequenceOffset, ChunkSequenceWriteView.GetSize())); + } + { + MutableMemoryView ChunksWriteView(Info.ChunkHashes.data(), sizeof(IoHash) * Header.ChunkHashCount); + ChunksWriteView.CopyFrom(View.Mid(Header.ChunkHashesOffset, ChunksWriteView.GetSize())); + } + + return Info; +} + +void +Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk) +{ + BasicFile Reconstructed; + Reconstructed.Open(TargetPath, BasicFile::Mode::kTruncate); + BasicFileWriter ReconstructedWriter(Reconstructed, 64 * 1024); + uint64_t Offset = 0; + for (uint32_t SequenceIndex : Info.ChunkSequence) + { + IoBuffer Chunk = GetChunk(Info.ChunkHashes[SequenceIndex]); + ReconstructedWriter.Write(Chunk.GetData(), Chunk.GetSize(), Offset); + Offset += Chunk.GetSize(); + } +} + +ChunkedInfoWithSource +ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params) +{ + ChunkedInfoWithSource Result; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks; + + ZenChunkHelper Chunker; + Chunker.SetUseThreshold(Params.UseThreshold); + Chunker.SetChunkSize(Params.MinSize, Params.MaxSize, Params.AvgSize); + size_t End = Offset + Size; + const size_t ScanBufferSize = 1u * 1024 * 1024; // (Params.MaxSize * 9) / 3;//1 * 1024 * 1024; + BasicFileBuffer RawBuffer(RawData, ScanBufferSize); + MemoryView SliceView = RawBuffer.MakeView(Min(End - Offset, ScanBufferSize), Offset); + ZEN_ASSERT(!SliceView.IsEmpty()); + size_t SliceSize = SliceView.GetSize(); + IoHashStream RawHashStream; + while (Offset < End) + { + size_t ScanLength = Chunker.ScanChunk(SliceView.GetData(), SliceSize); + if (ScanLength == ZenChunkHelper::kNoBoundaryFound) + { + if (Offset + SliceSize == End) + { + ScanLength = SliceSize; + } + else + { + SliceView = RawBuffer.MakeView(Min(End - Offset, ScanBufferSize), Offset); + SliceSize = SliceView.GetSize(); + Chunker.Reset(); + continue; + } + } + uint32_t ChunkLength = gsl::narrow<uint32_t>(ScanLength); // +HashedLength); + MemoryView ChunkView = SliceView.Left(ScanLength); + RawHashStream.Append(ChunkView); + IoHash ChunkHash = IoHash::HashBuffer(ChunkView); + SliceView.RightChopInline(ScanLength); + if (auto It = FoundChunks.find(ChunkHash); It != FoundChunks.end()) + { + Result.Info.ChunkSequence.push_back(It->second); + } + else + { + uint32_t ChunkIndex = gsl::narrow<uint32_t>(Result.Info.ChunkHashes.size()); + FoundChunks.insert_or_assign(ChunkHash, ChunkIndex); + Result.Info.ChunkHashes.push_back(ChunkHash); + Result.ChunkSources.push_back(ChunkSource{.Offset = Offset, .Size = ChunkLength}); + Result.Info.ChunkSequence.push_back(ChunkIndex); + } + + SliceSize = SliceView.GetSize(); + Offset += ChunkLength; + } + Result.Info.RawSize = Size; + Result.Info.RawHash = RawHashStream.GetHash(); + return Result; +} + +} // namespace zen + +#if ZEN_WITH_TESTS +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iohash.h> +# include <zencore/logging.h> +# include <zencore/scopeguard.h> +# include <zencore/timer.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> + +# include "chunking.h" + +ZEN_THIRD_PARTY_INCLUDES_START +# include <tsl/robin_map.h> +# include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { +# if 0 +TEST_CASE("chunkedfile.findparams") +{ +# if 1 + DirectoryContent SourceContent1; + GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208", DirectoryContent::IncludeFilesFlag, SourceContent1); + const std::vector<std::filesystem::path>& SourceFiles1 = SourceContent1.Files; + DirectoryContent SourceContent2; + GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208_2", DirectoryContent::IncludeFilesFlag, SourceContent2); + const std::vector<std::filesystem::path>& SourceFiles2 = SourceContent2.Files; +# else + std::filesystem::path SourcePath1 = + "E:\\Temp\\ChunkingTestData\\31375996\\ShaderArchive-FortniteGame_Chunk10-PCD3D_SM6-PCD3D_SM6.ushaderbytecode"; + std::filesystem::path SourcePath2 = + "E:\\Temp\\ChunkingTestData\\31379208\\ShaderArchive-FortniteGame_Chunk10-PCD3D_SM6-PCD3D_SM6.ushaderbytecode"; + const std::vector<std::filesystem::path>& SourceFiles1 = {SourcePath1}; + const std::vector<std::filesystem::path>& SourceFiles2 = {SourcePath2}; +# endif + ChunkedParams Params[] = {ChunkedParams{.UseThreshold = false, .MinSize = 17280, .MaxSize = 139264, .AvgSize = 36340}, + ChunkedParams{.UseThreshold = false, .MinSize = 15456, .MaxSize = 122880, .AvgSize = 35598}, + ChunkedParams{.UseThreshold = false, .MinSize = 16848, .MaxSize = 135168, .AvgSize = 39030}, + ChunkedParams{.UseThreshold = false, .MinSize = 14256, .MaxSize = 114688, .AvgSize = 36222}, + ChunkedParams{.UseThreshold = false, .MinSize = 15744, .MaxSize = 126976, .AvgSize = 36600}, + ChunkedParams{.UseThreshold = false, .MinSize = 15264, .MaxSize = 122880, .AvgSize = 35442}, + ChunkedParams{.UseThreshold = false, .MinSize = 16464, .MaxSize = 131072, .AvgSize = 37950}, + ChunkedParams{.UseThreshold = false, .MinSize = 15408, .MaxSize = 122880, .AvgSize = 38914}, + ChunkedParams{.UseThreshold = false, .MinSize = 15408, .MaxSize = 122880, .AvgSize = 35556}, + ChunkedParams{.UseThreshold = false, .MinSize = 15360, .MaxSize = 122880, .AvgSize = 35520}, + ChunkedParams{.UseThreshold = false, .MinSize = 15312, .MaxSize = 122880, .AvgSize = 35478}, + ChunkedParams{.UseThreshold = false, .MinSize = 16896, .MaxSize = 135168, .AvgSize = 39072}, + ChunkedParams{.UseThreshold = false, .MinSize = 15360, .MaxSize = 122880, .AvgSize = 38880}, + ChunkedParams{.UseThreshold = false, .MinSize = 15840, .MaxSize = 126976, .AvgSize = 36678}, + ChunkedParams{.UseThreshold = false, .MinSize = 16800, .MaxSize = 135168, .AvgSize = 38994}, + ChunkedParams{.UseThreshold = false, .MinSize = 15888, .MaxSize = 126976, .AvgSize = 36714}, + ChunkedParams{.UseThreshold = false, .MinSize = 15792, .MaxSize = 126976, .AvgSize = 36636}, + ChunkedParams{.UseThreshold = false, .MinSize = 14880, .MaxSize = 118784, .AvgSize = 37609}, + ChunkedParams{.UseThreshold = false, .MinSize = 15936, .MaxSize = 126976, .AvgSize = 36756}, + ChunkedParams{.UseThreshold = false, .MinSize = 15456, .MaxSize = 122880, .AvgSize = 38955}, + ChunkedParams{.UseThreshold = false, .MinSize = 15984, .MaxSize = 126976, .AvgSize = 36792}, + ChunkedParams{.UseThreshold = false, .MinSize = 14400, .MaxSize = 114688, .AvgSize = 36338}, + ChunkedParams{.UseThreshold = false, .MinSize = 14832, .MaxSize = 118784, .AvgSize = 37568}, + ChunkedParams{.UseThreshold = false, .MinSize = 16944, .MaxSize = 135168, .AvgSize = 39108}, + ChunkedParams{.UseThreshold = false, .MinSize = 14352, .MaxSize = 114688, .AvgSize = 36297}, + ChunkedParams{.UseThreshold = false, .MinSize = 14208, .MaxSize = 114688, .AvgSize = 36188}, + ChunkedParams{.UseThreshold = false, .MinSize = 14448, .MaxSize = 114688, .AvgSize = 36372}, + ChunkedParams{.UseThreshold = false, .MinSize = 13296, .MaxSize = 106496, .AvgSize = 36592}, + ChunkedParams{.UseThreshold = false, .MinSize = 15264, .MaxSize = 122880, .AvgSize = 38805}, + ChunkedParams{.UseThreshold = false, .MinSize = 14304, .MaxSize = 114688, .AvgSize = 36263}, + ChunkedParams{.UseThreshold = false, .MinSize = 14784, .MaxSize = 118784, .AvgSize = 37534}, + ChunkedParams{.UseThreshold = false, .MinSize = 15312, .MaxSize = 122880, .AvgSize = 38839}, + ChunkedParams{.UseThreshold = false, .MinSize = 14256, .MaxSize = 114688, .AvgSize = 39360}, + ChunkedParams{.UseThreshold = false, .MinSize = 13776, .MaxSize = 110592, .AvgSize = 37976}, + ChunkedParams{.UseThreshold = false, .MinSize = 14736, .MaxSize = 118784, .AvgSize = 37493}, + ChunkedParams{.UseThreshold = false, .MinSize = 14928, .MaxSize = 118784, .AvgSize = 37643}, + ChunkedParams{.UseThreshold = false, .MinSize = 14448, .MaxSize = 114688, .AvgSize = 39504}, + ChunkedParams{.UseThreshold = false, .MinSize = 13392, .MaxSize = 106496, .AvgSize = 36664}, + ChunkedParams{.UseThreshold = false, .MinSize = 13872, .MaxSize = 110592, .AvgSize = 38048}, + ChunkedParams{.UseThreshold = false, .MinSize = 14352, .MaxSize = 114688, .AvgSize = 39432}, + ChunkedParams{.UseThreshold = false, .MinSize = 13200, .MaxSize = 106496, .AvgSize = 36520}, + ChunkedParams{.UseThreshold = false, .MinSize = 17328, .MaxSize = 139264, .AvgSize = 36378}, + ChunkedParams{.UseThreshold = false, .MinSize = 17376, .MaxSize = 139264, .AvgSize = 36421}, + ChunkedParams{.UseThreshold = false, .MinSize = 17424, .MaxSize = 139264, .AvgSize = 36459}, + ChunkedParams{.UseThreshold = false, .MinSize = 17472, .MaxSize = 139264, .AvgSize = 36502}, + ChunkedParams{.UseThreshold = false, .MinSize = 17520, .MaxSize = 139264, .AvgSize = 36540}, + ChunkedParams{.UseThreshold = false, .MinSize = 17808, .MaxSize = 143360, .AvgSize = 37423}, + ChunkedParams{.UseThreshold = false, .MinSize = 17856, .MaxSize = 143360, .AvgSize = 37466}, + ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 25834}, + ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 21917}, + ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 29751}, + ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 33668}, + ChunkedParams{.UseThreshold = false, .MinSize = 17952, .MaxSize = 143360, .AvgSize = 37547}, + ChunkedParams{.UseThreshold = false, .MinSize = 17904, .MaxSize = 143360, .AvgSize = 37504}, + ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 22371}, + ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 37585}, + ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 26406}, + ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 26450}, + ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 30615}, + ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 30441}, + ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 22417}, + ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 22557}, + ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 30528}, + ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 27112}, + ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 34644}, + ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 34476}, + ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 35408}, + ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 38592}, + ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 30483}, + ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 26586}, + ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 26496}, + ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 31302}, + ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 34516}, + ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 22964}, + ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 35448}, + ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 38630}, + ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 23010}, + ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 31260}, + ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 34600}, + ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 27156}, + ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 30570}, + ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 38549}, + ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 22510}, + ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 38673}, + ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 34560}, + ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 22464}, + ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 26540}, + ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 38511}, + ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 23057}, + ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 27202}, + ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 31347}, + ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 35492}, + ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 31389}, + ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 27246}, + ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 23103}, + ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 35532}, + ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 23150}, + ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 27292}, + ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 31434}, + ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 35576}, + ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 27336}, + ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 23196}, + ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 31476}, + ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 35616}, + ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 27862}, + ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 32121}, + ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 23603}, + ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 36380}, + ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 27908}, + ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 23650}, + ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 32166}, + ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 36424}, + ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 23696}, + ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 32253}, + ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 32208}, + ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 23743}, + ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 36548}, + ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 28042}, + ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 23789}, + ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 32295}, + ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 36508}, + ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 27952}, + ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 27998}, + ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 36464}}; + + static const size_t ParamsCount = sizeof(Params) / sizeof(ChunkedParams); + std::vector<ChunkedInfoWithSource> Infos1(SourceFiles1.size()); + std::vector<ChunkedInfoWithSource> Infos2(SourceFiles2.size()); + + WorkerThreadPool WorkerPool(32); + + for (size_t I = 0; I < ParamsCount; I++) + { + for (int UseThreshold = 0; UseThreshold < 2; UseThreshold++) + { + Latch WorkLatch(1); + ChunkedParams Param = Params[I]; + Param.UseThreshold = UseThreshold == 1; + Stopwatch Timer; + for (size_t F = 0; F < SourceFiles1.size(); F++) + { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&WorkLatch, F, Param, &SourceFiles1, &Infos1]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + BasicFile SourceData1; + SourceData1.Open(SourceFiles1[F], BasicFile::Mode::kRead); + Infos1[F] = ChunkData(SourceData1, 0, SourceData1.FileSize(), Param); + }); + } + for (size_t F = 0; F < SourceFiles2.size(); F++) + { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&WorkLatch, F, Param, &SourceFiles2, &Infos2]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + BasicFile SourceData2; + SourceData2.Open(SourceFiles2[F], BasicFile::Mode::kRead); + Infos2[F] = ChunkData(SourceData2, 0, SourceData2.FileSize(), Param); + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + uint64_t ChunkTimeMS = Timer.GetElapsedTimeMs(); + + uint64_t Raw1Size = 0; + tsl::robin_set<IoHash> Chunks1; + size_t ChunkedSize1 = 0; + for (size_t F = 0; F < SourceFiles1.size(); F++) + { + const ChunkedInfoWithSource& Info = Infos1[F]; + Raw1Size += Info.Info.RawSize; + for (uint32_t Chunk1Index = 0; Chunk1Index < Info.Info.ChunkHashes.size(); ++Chunk1Index) + { + const IoHash ChunkHash = Info.Info.ChunkHashes[Chunk1Index]; + if (Chunks1.insert(ChunkHash).second) + { + ChunkedSize1 += Info.ChunkSources[Chunk1Index].Size; + } + } + } + + uint64_t Raw2Size = 0; + tsl::robin_set<IoHash> Chunks2; + size_t ChunkedSize2 = 0; + size_t DiffSize = 0; + for (size_t F = 0; F < SourceFiles2.size(); F++) + { + const ChunkedInfoWithSource& Info = Infos2[F]; + Raw2Size += Info.Info.RawSize; + for (uint32_t Chunk2Index = 0; Chunk2Index < Info.Info.ChunkHashes.size(); ++Chunk2Index) + { + const IoHash ChunkHash = Info.Info.ChunkHashes[Chunk2Index]; + if (Chunks2.insert(ChunkHash).second) + { + ChunkedSize2 += Info.ChunkSources[Chunk2Index].Size; + if (!Chunks1.contains(ChunkHash)) + { + DiffSize += Info.ChunkSources[Chunk2Index].Size; + } + } + } + } + + ZEN_INFO( + "Diff = {}, Chunks1 = {}, Chunks2 = {}, .UseThreshold = {}, .MinSize = {}, .MaxSize = {}, .AvgSize = {}, RawSize(1) = {}, " + "RawSize(2) = {}, " + "Saved(1) = {}, Saved(2) = {} in {}", + NiceBytes(DiffSize), + Chunks1.size(), + Chunks2.size(), + Param.UseThreshold, + Param.MinSize, + Param.MaxSize, + Param.AvgSize, + NiceBytes(Raw1Size), + NiceBytes(Raw2Size), + NiceBytes(Raw1Size - ChunkedSize1), + NiceBytes(Raw2Size - ChunkedSize2), + NiceTimeSpanMs(ChunkTimeMS)); + } + } + +# if 0 + for (int64_t MinSizeBase = (12u * 1024u); MinSizeBase <= (32u * 1024u); MinSizeBase += 512) + { + for (int64_t Wiggle = -132; Wiggle < 126; Wiggle += 2) + { + // size_t MinSize = 7 * 1024 - 61; // (size_t)(MinSizeBase + Wiggle); + // size_t MaxSize = 16 * (7 * 1024); // 8 * 7 * 1024;// MinSizeBase * 6; + // size_t AvgSize = MaxSize / 2; // 4 * 7 * 1024;// MinSizeBase * 3; + size_t MinSize = (size_t)(MinSizeBase + Wiggle); + //for (size_t MaxSize = (MinSize * 4) - 768; MaxSize < (MinSize * 5) + 768; MaxSize += 64) + size_t MaxSize = 8u * MinSizeBase; + { + for (size_t AvgSize = (MaxSize - MinSize) / 32 + MinSize; AvgSize < (MaxSize - MinSize) / 4 + MinSize; AvgSize += (MaxSize - MinSize) / 32) +// size_t AvgSize = (MaxSize - MinSize) / 4 + MinSize; + { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&WorkLatch, MinSize, MaxSize, AvgSize, SourcePath1, SourcePath2]() + { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + ChunkedParams Params{ .UseThreshold = true, .MinSize = MinSize, .MaxSize = MaxSize, .AvgSize = AvgSize }; + BasicFile SourceData1; + SourceData1.Open(SourcePath1, BasicFile::Mode::kRead); + BasicFile SourceData2; + SourceData2.Open(SourcePath2, BasicFile::Mode::kRead); + ChunkedInfoWithSource Info1 = ChunkData(SourceData1, Params); + ChunkedInfoWithSource Info2 = ChunkData(SourceData2, Params); + + tsl::robin_set<IoHash> Chunks1; + Chunks1.reserve(Info1.Info.ChunkHashes.size()); + Chunks1.insert(Info1.Info.ChunkHashes.begin(), Info1.Info.ChunkHashes.end()); + size_t ChunkedSize1 = 0; + for (uint32_t Chunk1Index = 0; Chunk1Index < Info1.Info.ChunkHashes.size(); ++Chunk1Index) + { + ChunkedSize1 += Info1.ChunkSources[Chunk1Index].Size; + } + size_t DiffSavedSize = 0; + size_t ChunkedSize2 = 0; + for (uint32_t Chunk2Index = 0; Chunk2Index < Info2.Info.ChunkHashes.size(); ++Chunk2Index) + { + ChunkedSize2 += Info2.ChunkSources[Chunk2Index].Size; + if (Chunks1.find(Info2.Info.ChunkHashes[Chunk2Index]) == Chunks1.end()) + { + DiffSavedSize += Info2.ChunkSources[Chunk2Index].Size; + } + } + ZEN_INFO("Diff {}, Chunks1: {}, Chunks2: {}, Min: {}, Max: {}, Avg: {}, Saved(1) {}, Saved(2) {}", + NiceBytes(DiffSavedSize), + Info1.Info.ChunkHashes.size(), + Info2.Info.ChunkHashes.size(), + MinSize, + MaxSize, + AvgSize, + NiceBytes(Info1.Info.RawSize - ChunkedSize1), + NiceBytes(Info2.Info.RawSize - ChunkedSize2)); + }); + } + } + } + } +# endif // 0 + + // WorkLatch.CountDown(); + // WorkLatch.Wait(); +} +# endif // 0 + +void +chunkedfile_forcelink() +{ +} + +} // namespace zen + +#endif diff --git a/src/zenstore/chunking.cpp b/src/zenstore/chunking.cpp index 80674de0a..30edd322a 100644 --- a/src/zenstore/chunking.cpp +++ b/src/zenstore/chunking.cpp @@ -36,7 +36,7 @@ static const uint32_t BuzhashTable[] = { }; // ROL operation (compiler turns this into a ROL when optimizing) -static inline uint32_t +ZEN_FORCEINLINE static uint32_t Rotate32(uint32_t Value, size_t RotateCount) { RotateCount &= 31; diff --git a/src/zenstore/include/zenstore/chunkedfile.h b/src/zenstore/include/zenstore/chunkedfile.h new file mode 100644 index 000000000..c6330bdbd --- /dev/null +++ b/src/zenstore/include/zenstore/chunkedfile.h @@ -0,0 +1,54 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/zencore.h> + +#include <functional> +#include <vector> + +namespace zen { + +class BasicFile; + +struct ChunkedInfo +{ + uint64_t RawSize = 0; + IoHash RawHash; + std::vector<uint32_t> ChunkSequence; + std::vector<IoHash> ChunkHashes; +}; + +struct ChunkSource +{ + uint64_t Offset; // 8 + uint32_t Size; // 4 +}; + +struct ChunkedInfoWithSource +{ + ChunkedInfo Info; + std::vector<ChunkSource> ChunkSources; +}; + +struct ChunkedParams +{ + bool UseThreshold = true; + size_t MinSize = (2u * 1024u) - 128u; + size_t MaxSize = (16u * 1024u); + size_t AvgSize = (3u * 1024u); +}; + +static const ChunkedParams UShaderByteCodeParams = {.UseThreshold = true, .MinSize = 17280, .MaxSize = 139264, .AvgSize = 36340}; + +ChunkedInfoWithSource ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params = {}); +void Reconstruct(const ChunkedInfo& Info, + const std::filesystem::path& TargetPath, + std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk); +IoBuffer SerializeChunkedInfo(const ChunkedInfo& Info); +ChunkedInfo DeserializeChunkedInfo(IoBuffer& Buffer); + +void chunkedfile_forcelink(); +} // namespace zen diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp index ad98bf652..f553fe5a0 100644 --- a/src/zenutil/basicfile.cpp +++ b/src/zenutil/basicfile.cpp @@ -272,7 +272,7 @@ BasicFile::Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec) for (const SharedBuffer& Buffer : Data.GetSegments()) { MemoryView BlockView = Buffer.GetView(); - Write(BlockView, FileOffset, Ec); + Write(BlockView, FileOffset + WrittenBytes, Ec); if (Ec) { @@ -490,6 +490,14 @@ BasicFile::SetFileSize(uint64_t FileSize) #endif } +void +BasicFile::Attach(void* Handle) +{ + ZEN_ASSERT(Handle != nullptr); + ZEN_ASSERT(m_FileHandle == nullptr); + m_FileHandle = Handle; +} + void* BasicFile::Detach() { @@ -716,7 +724,7 @@ BasicFileWriter::~BasicFileWriter() } void -BasicFileWriter::Write(void* Data, uint64_t Size, uint64_t FileOffset) +BasicFileWriter::Write(const void* Data, uint64_t Size, uint64_t FileOffset) { if (m_Buffer == nullptr || (Size >= m_BufferSize)) { diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h index f25d9f23c..0e4295ee3 100644 --- a/src/zenutil/include/zenutil/basicfile.h +++ b/src/zenutil/include/zenutil/basicfile.h @@ -65,6 +65,7 @@ public: void SetFileSize(uint64_t FileSize); IoBuffer ReadAll(); void WriteAll(IoBuffer Data, std::error_code& Ec); + void Attach(void* Handle); void* Detach(); inline void* Handle() { return m_FileHandle; } @@ -165,7 +166,7 @@ public: BasicFileWriter(BasicFile& Base, uint64_t BufferSize); ~BasicFileWriter(); - void Write(void* Data, uint64_t Size, uint64_t FileOffset); + void Write(const void* Data, uint64_t Size, uint64_t FileOffset); void Flush(); private: |