diff options
| author | Dan Engelbrecht <[email protected]> | 2024-02-05 16:52:42 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-05 16:52:42 +0100 |
| commit | 222392fff48e1659ec8bc59e42de09f3625111ad (patch) | |
| tree | 4e17c75beabd7704f2bca3feb26d0aa9e7de204e /src | |
| parent | 5.4.1-pre0 (diff) | |
| download | zen-222392fff48e1659ec8bc59e42de09f3625111ad.tar.xz zen-222392fff48e1659ec8bc59e42de09f3625111ad.zip | |
compress large attachments on demand (#647)
- Improvement: Speed up oplog export by fetching/compressing big attachments on demand
- Improvement: Speed up oplog export by batch-fetcing small attachments
- Improvement: Speed up oplog import by batching writes of oplog ops
- Improvement: Tweak oplog export default block size and embed size limit
- Improvement: Add more messaging and progress during oplog import/export
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/compositebuffer.cpp | 35 | ||||
| -rw-r--r-- | src/zencore/include/zencore/stream.h | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 183 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 710 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 14 |
6 files changed, 561 insertions, 390 deletions
diff --git a/src/zencore/compositebuffer.cpp b/src/zencore/compositebuffer.cpp index f29f6e810..583ef19c6 100644 --- a/src/zencore/compositebuffer.cpp +++ b/src/zencore/compositebuffer.cpp @@ -155,7 +155,11 @@ CompositeBuffer::ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& Copy // A hot path for this code is when we call CompressedBuffer::FromCompressed which // is only interested in reading the header (first 64 bytes or so) and then throws // away the materialized data. - MutableMemoryView WriteView; + if (CopyBuffer.GetSize() < Size) + { + CopyBuffer = UniqueBuffer::Alloc(Size); + } + MutableMemoryView WriteView = CopyBuffer.GetMutableView(); size_t SegmentCount = m_Segments.size(); ZEN_ASSERT(It.SegmentIndex < SegmentCount); uint64_t SizeLeft = Size; @@ -163,31 +167,10 @@ CompositeBuffer::ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& Copy { const SharedBuffer& Segment = m_Segments[It.SegmentIndex]; size_t SegmentSize = Segment.GetSize(); - if (Size == SizeLeft && Size <= (SegmentSize - It.OffsetInSegment)) - { - IoBuffer SubSegment(Segment.AsIoBuffer(), It.OffsetInSegment, SizeLeft); - MemoryView View = SubSegment.GetView(); - It.OffsetInSegment += SizeLeft; - ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize); - if (It.OffsetInSegment == SegmentSize) - { - It.SegmentIndex++; - It.OffsetInSegment = 0; - } - return View; - } - if (WriteView.GetSize() == 0) - { - if (CopyBuffer.GetSize() < Size) - { - CopyBuffer = UniqueBuffer::Alloc(Size); - } - WriteView = CopyBuffer.GetMutableView(); - } - size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft); - IoBuffer SubSegment(Segment.AsIoBuffer(), It.OffsetInSegment, CopySize); - MemoryView ReadView = SubSegment.GetView(); - WriteView = WriteView.CopyFrom(ReadView); + size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft); + IoBuffer SubSegment(Segment.AsIoBuffer(), It.OffsetInSegment, CopySize); + MemoryView ReadView = SubSegment.GetView(); + WriteView = WriteView.CopyFrom(ReadView); It.OffsetInSegment += CopySize; ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize); if (It.OffsetInSegment == SegmentSize) diff --git a/src/zencore/include/zencore/stream.h b/src/zencore/include/zencore/stream.h index a9d35ef1b..a28290041 100644 --- a/src/zencore/include/zencore/stream.h +++ b/src/zencore/include/zencore/stream.h @@ -34,8 +34,8 @@ public: inline const uint8_t* Data() const { return m_Buffer.data(); } inline const uint8_t* GetData() const { return m_Buffer.data(); } - inline uint64_t Size() const { return m_Buffer.size(); } - inline uint64_t GetSize() const { return m_Buffer.size(); } + inline uint64_t Size() const { return m_Offset; } + inline uint64_t GetSize() const { return m_Offset; } void Reset(); inline MemoryView GetView() const { return MemoryView(m_Buffer.data(), m_Offset); } diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index ea219f9b0..72a8e1409 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -448,13 +448,40 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash) + struct AppendOpData + { + MemoryView Buffer; + uint32_t OpCoreHash; + Oid KeyHash; + }; + + static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core) + { + using namespace std::literals; + + AppendOpData OpData; + + OpData.Buffer = Core.GetView(); + const uint64_t WriteSize = OpData.Buffer.GetSize(); + OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF); + + ZEN_ASSERT(WriteSize != 0); + + XXH3_128Stream KeyHasher; + Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + memcpy(&OpData.KeyHash, KeyHash128.Hash, sizeof OpData.KeyHash); + + return OpData; + } + + OplogEntry AppendOp(const AppendOpData& OpData) { ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); using namespace std::literals; - uint64_t WriteSize = Buffer.GetSize(); + uint64_t WriteSize = OpData.Buffer.GetSize(); RwLock::ExclusiveLockScope Lock(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; @@ -467,15 +494,70 @@ struct ProjectStore::OplogStorage : public RefCounted OplogEntry Entry = {.OpLsn = OpLsn, .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), .OpCoreSize = uint32_t(WriteSize), - .OpCoreHash = OpCoreHash, - .OpKeyHash = KeyHash}; + .OpCoreHash = OpData.OpCoreHash, + .OpKeyHash = OpData.KeyHash}; m_Oplog.Append(Entry); - m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); + m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); return Entry; } + std::vector<OplogEntry> AppendOps(std::span<const AppendOpData> Ops) + { + ZEN_TRACE_CPU("Store::OplogStorage::AppendOps"); + + using namespace std::literals; + + size_t OpCount = Ops.size(); + std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes; + std::vector<uint32_t> OpLsns; + OffsetAndSizes.resize(OpCount); + OpLsns.resize(OpCount); + + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize(); + } + + uint64_t WriteStart = 0; + uint64_t WriteLength = 0; + { + RwLock::ExclusiveLockScope Lock(m_RwLock); + WriteStart = m_NextOpsOffset; + ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign)); + uint64_t WriteOffset = WriteStart; + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart; + OpLsns[OpIndex] = ++m_MaxLsn; + WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign); + } + WriteLength = WriteOffset - WriteStart; + m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign); + } + + IoBuffer WriteBuffer(WriteLength); + + std::vector<OplogEntry> Entries; + Entries.resize(OpCount); + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first); + WriteBufferView.CopyFrom(Ops[OpIndex].Buffer); + Entries[OpIndex] = {.OpLsn = OpLsns[OpIndex], + .OpCoreOffset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign), + .OpCoreSize = uint32_t(OffsetAndSizes[OpIndex].second), + .OpCoreHash = Ops[OpIndex].OpCoreHash, + .OpKeyHash = Ops[OpIndex].KeyHash}; + } + + m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart); + m_Oplog.Append(Entries); + + return Entries; + } + void AppendTombstone(Oid KeyHash) { OplogEntry Entry = {.OpKeyHash = KeyHash}; @@ -1243,6 +1325,17 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) return EntryId; } +RefPtr<ProjectStore::OplogStorage> +ProjectStore::Oplog::GetStorage() +{ + RefPtr<OplogStorage> Storage; + { + RwLock::SharedLockScope _(m_OplogLock); + Storage = m_Storage; + } + return Storage; +} + uint32_t ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) { @@ -1250,35 +1343,61 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) using namespace std::literals; - OplogEntryMapping Mapping = GetMapping(Core); + RefPtr<OplogStorage> Storage = GetStorage(); + if (!m_Storage) + { + return 0xffffffffu; + } - MemoryView Buffer = Core.GetView(); - const uint64_t WriteSize = Buffer.GetSize(); - const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); + OplogEntryMapping Mapping = GetMapping(Core); + OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core); - ZEN_ASSERT(WriteSize != 0); + const OplogEntry OpEntry = m_Storage->AppendOp(OpData); - XXH3_128Stream KeyHasher; - Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash128 = KeyHasher.GetHash(); - Oid KeyHash; - memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); - RefPtr<OplogStorage> Storage; + return EntryId; +} + +std::vector<uint32_t> +ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) +{ + ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries"); + + using namespace std::literals; + + RefPtr<OplogStorage> Storage = GetStorage(); + if (!m_Storage) { - RwLock::SharedLockScope _(m_OplogLock); - Storage = m_Storage; + return std::vector<uint32_t>(Cores.size(), 0xffffffffu); } - if (!m_Storage) + + size_t OpCount = Cores.size(); + std::vector<OplogEntryMapping> Mappings; + std::vector<OplogStorage::AppendOpData> OpDatas; + Mappings.resize(OpCount); + OpDatas.resize(OpCount); + + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { - return 0xffffffffu; + const CbObjectView& Core = Cores[OpIndex]; + OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core); + Mappings[OpIndex] = GetMapping(Core); } - const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash); - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); + std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas); - return EntryId; + std::vector<uint32_t> EntryIds; + EntryIds.resize(OpCount); + { + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); + } + } + return EntryIds; } ////////////////////////////////////////////////////////////////////////// @@ -2730,7 +2849,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, } Project->TouchOplog(OplogId); - size_t MaxBlockSize = 128u * 1024u * 1024u; + size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize; if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) { if (auto Value = ParseInt<size_t>(Param)) @@ -2738,7 +2857,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, MaxBlockSize = Value.value(); } } - size_t MaxChunkEmbedSize = 1024u * 1024u; + size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize; if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) { if (auto Value = ParseInt<size_t>(Param)) @@ -2758,9 +2877,9 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, /* BuildBlocks */ false, /* IgnoreMissingAttachemnts */ false, [](CompressedBuffer&&, const IoHash) {}, - [](const IoHash&) {}, + [](const IoHash&, const TGetAttachmentBufferFunc&) {}, [](const std::unordered_set<IoHash, IoHash::Hasher>) {}, - nullptr); + /* EmbedLooseFiles*/ false); OutResponse = std::move(ContainerResult.ContainerObject); return ConvertResult(ContainerResult); @@ -3119,8 +3238,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op using namespace std::literals; - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); @@ -3180,8 +3299,8 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, using namespace std::literals; - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index f9611653b..5b873b758 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -109,7 +109,8 @@ public: */ uint32_t AppendNewOplogEntry(CbPackage Op); - uint32_t AppendNewOplogEntry(CbObjectView Core); + uint32_t AppendNewOplogEntry(CbObjectView Core); + std::vector<uint32_t> AppendNewOplogEntries(std::span<CbObjectView> Cores); enum UpdateType { @@ -166,6 +167,8 @@ public: RefPtr<OplogStorage> m_Storage; std::string m_OplogId; + RefPtr<OplogStorage> GetStorage(); + /** Scan oplog and register each entry, thus updating the in-memory tracking tables */ void ReplayLog(); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 445179983..a8aa22526 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -197,17 +197,15 @@ CreateBlock(WorkerThreadPool& WorkerPool, { return; } - if (!Chunks.empty()) + ZEN_ASSERT(!Chunks.empty()); + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash + IoHash BlockHash = CompressedBlock.DecodeRawHash(); { - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - AsyncOnBlock(std::move(CompressedBlock), BlockHash); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex].BlockHash = BlockHash; - } + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex].BlockHash = BlockHash; } + AsyncOnBlock(std::move(CompressedBlock), BlockHash); }); } @@ -234,30 +232,27 @@ BuildContainer(CidStore& ChunkStore, const std::vector<Block>& KnownBlocks, WorkerThreadPool& WorkerPool, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, - tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments, + bool EmbedLooseFiles, JobContext* OptionalContext, AsyncRemoteResult& RemoteResult) { using namespace std::literals; - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - CbObjectWriter SectionOpsWriter; - SectionOpsWriter.BeginArray("ops"sv); - size_t OpCount = 0; CbObject OplogContainerObject; { + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseAttachments; + RwLock BlocksLock; std::vector<Block> Blocks; CompressedBuffer OpsBuffer; std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; - size_t BlockSize = 0; - std::vector<SharedBuffer> ChunksInBlock; std::unordered_map<IoHash, int, IoHash::Hasher> Attachments; auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function<void(CbObjectView)>& CB) { @@ -282,111 +277,69 @@ BuildContainer(CidStore& ChunkStore, if (DataHash == IoHash::Zero) { + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project.RootDir / ServerPath; + if (!std::filesystem::is_regular_file(FilePath)) { - // Read file contents into memory and compress - - std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = Project.RootDir / ServerPath; - BasicFile DataFile; - std::error_code Ec; - DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); - if (Ec) - { - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(FilePath.string()); - Sb.Append("' for op: \n"); - Op.ToJson(Sb); - - ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView())); - if (IgnoreMissingAttachments) - { - continue; - } - else - { - throw std::system_error( - Ec, - fmt::format("failed to open file '{}', mode: {:x}", FilePath, uint32_t(BasicFile::Mode::kRead))); - } - } - - IoBuffer FileIoBuffer = DataFile.ReadAll(); - DataFile.Close(); - - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); - - DataHash = Compressed.DecodeRawHash(); - uint64_t PayloadSize = Compressed.GetCompressed().GetSize(); - if (PayloadSize > MaxChunkEmbedSize) + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(FilePath.string()); + Sb.Append("' for op: \n"); + Op.ToJson(Sb); + + ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView())); + if (IgnoreMissingAttachments) { - // Write it out as a temporary file - IoBuffer AttachmentBuffer; - std::filesystem::path AttachmentPath = Oplog.TempPath() / DataHash.ToHexString(); - if (std::filesystem::is_regular_file(AttachmentPath)) - { - AttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); - if (AttachmentBuffer.GetSize() != PayloadSize) - { - AttachmentBuffer = IoBuffer{}; - } - } - if (!AttachmentBuffer) - { - BasicFile BlockFile; - uint32_t RetriesLeft = 3; - BlockFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { - if (RetriesLeft == 0) - { - return false; - } - ZEN_WARN("Failed to create temp attachment '{}': '{}', retries left: {}.", - AttachmentPath, - Ec.message(), - RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - RetriesLeft--; - return true; - }); - uint64_t Offset = 0; - for (const SharedBuffer& Buffer : Compressed.GetCompressed().GetSegments()) - { - BlockFile.Write(Buffer.GetView(), Offset); - Offset += Buffer.GetSize(); - } - void* FileHandle = BlockFile.Detach(); - AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); - - AttachmentBuffer.SetDeleteOnClose(true); - ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize)); - } - OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer); + continue; } else { - // If it is small we just hang on to the compressed buffer - OutLooseAttachments->insert_or_assign(DataHash, Compressed.GetCompressed().Flatten().AsIoBuffer()); + throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath)); } } + SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); + // Large file, just hash it for now and leave compression for later via callback + // We don't want to spend time compressing a large attachment if we don't need to upload it + DataHash = IoHash::HashBuffer(DataBuffer); + if (DataBuffer.GetSize() > MaxChunkEmbedSize) + { + LargeChunkHashes.insert(DataHash); + } + LooseAttachments.insert_or_assign( + DataHash, + [AttachmentBuffer = std::move(DataBuffer)](const IoHash& DataHash) -> IoBuffer { + Stopwatch AttachmentTimer; + uint64_t RawSize = AttachmentBuffer.GetSize(); + CompressedBuffer Compressed = + CompressedBuffer::Compress(AttachmentBuffer, OodleCompressor::Mermaid, OodleCompressionLevel::Normal); + ZEN_ASSERT(Compressed.DecodeRawHash() == DataHash); + uint64_t PayloadSize = Compressed.GetCompressedSize(); + ZEN_INFO("Compressed loose file attachment {} ({} -> {}) in {}", + DataHash, + NiceBytes(RawSize), + NiceBytes(PayloadSize), + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentTimer.GetElapsedTimeMs()))); + return Compressed.GetCompressed().Flatten().AsIoBuffer(); + }); + } - // Rewrite file array entry with new data reference - CbObjectWriter Writer; - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; - } - return false; - }); - Writer.AddBinaryAttachment("data"sv, DataHash); + // Rewrite file array entry with new data reference + CbObjectWriter Writer; + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, DataHash); - CbObject RewrittenOp = Writer.Save(); - Cbo.AddObject(std::move(RewrittenOp)); - CopyField = false; + CbObject RewrittenOp = Writer.Save(); + Cbo.AddObject(std::move(RewrittenOp)); + CopyField = false; - Attachments.insert_or_assign(DataHash, LSN); - } + Attachments.insert_or_assign(DataHash, LSN); } if (CopyField) @@ -423,41 +376,66 @@ BuildContainer(CidStore& ChunkStore, ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); - Stopwatch Timer; - tsl::robin_map<int, std::string> OpLSNToKey; + Stopwatch Timer; + tsl::robin_map<int, std::string> OpLSNToKey; + CompressedBuffer CompressedOpsSection; { - Stopwatch RewriteOplogTimer; - Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) { - if (RemoteResult.IsError()) - { - return; - } - std::string_view Key = Op["key"sv].AsString(); - OpLSNToKey.insert({LSN, std::string(Key)}); - Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); - if (OutLooseAttachments != nullptr) - { - RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); - } - else - { - SectionOpsWriter << Op; - } - OpCount++; + CbObjectWriter SectionOpsWriter; + SectionOpsWriter.BeginArray("ops"sv); + { + Stopwatch RewriteOplogTimer; + Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) { + if (RemoteResult.IsError()) + { + return; + } + std::string_view Key = Op["key"sv].AsString(); + OpLSNToKey.insert({LSN, std::string(Key)}); + Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); + if (EmbedLooseFiles) + { + RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); + } + else + { + SectionOpsWriter << Op; + } + OpCount++; + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return; + } + if (OpCount % 100000 == 0) + { + ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount)); + } + }); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; } - if (OpCount % 100000 == 0) - { - ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount)); - } - }); - ReportMessage(OptionalContext, - fmt::format("Rewrote {} ops to new oplog in {}", - OpCount, - NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); + ReportMessage(OptionalContext, + fmt::format("Rewrote {} ops to new oplog in {}", + OpCount, + NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); + } + SectionOpsWriter.EndArray(); // "ops" + + { + Stopwatch CompressOpsTimer; + CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), + OodleCompressor::Mermaid, + OodleCompressionLevel::Normal); + ReportMessage(OptionalContext, + fmt::format("Compressed oplog section {} ({} -> {}) in {}", + CompressedOpsSection.DecodeRawHash(), + NiceBytes(CompressedOpsSection.DecodeRawSize()), + NiceBytes(CompressedOpsSection.GetCompressedSize()), + NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs())))); + } } if (IsCancelled(OptionalContext)) @@ -565,96 +543,205 @@ BuildContainer(CidStore& ChunkStore, SortedAttachments.size(), OpLSNToKey.size())); - auto GetPayload = [&](const IoHash& AttachmentHash) { - if (OutLooseAttachments != nullptr) + for (const IoHash& AttachmentHash : LargeChunkHashes) + { + if (IsCancelled(OptionalContext)) { - auto PayloadIt = OutLooseAttachments->find(AttachmentHash); - if (PayloadIt != OutLooseAttachments->end()) - { - return PayloadIt->second; - } + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; + } + + if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end()) + { + OnLargeAttachment(AttachmentHash, std::move(It->second)); + LooseAttachments.erase(It); } - return ChunkStore.FindChunkByCid(AttachmentHash); + else + { + OnLargeAttachment(AttachmentHash, + [&ChunkStore](const IoHash& AttachmentHash) { return ChunkStore.FindChunkByCid(AttachmentHash); }); + } + } + size_t LargeAttachmentCount = LargeChunkHashes.size(); + + Latch BlockCreateLatch(1); + size_t GeneratedBlockCount = 0; + size_t BlockSize = 0; + std::vector<SharedBuffer> ChunksInBlock; + int LastLSNOp = -1; + auto GetPayload = [&](const IoHash& AttachmentHash) { + if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end()) + { + IoBuffer Payload = It->second(AttachmentHash); + LooseAttachments.erase(It); + return Payload; + } + return ChunkStore.FindChunkByCid(AttachmentHash); }; - Latch BlockCreateLatch(1); - size_t GeneratedBlockCount = 0; - size_t LargeAttachmentCount = 0; - int LastLSNOp = -1; + uint32_t ResolvedLargeCount = 0; + uint32_t ResolvedSmallCount = 0; + uint32_t ResolvedFailedCount = 0; + uint32_t ComposedBlocks = 0; - for (const IoHash& AttachmentHash : SortedAttachments) + for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end();) { if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) + break; + } + ReportProgress(OptionalContext, + fmt::format("Resolving attachments: {} large, {} small, {} blocks built", + ResolvedLargeCount, + ResolvedSmallCount, + ComposedBlocks), + SortedAttachments.size(), + SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount)); + + std::vector<IoHash> SmallChunkHashes; + SmallChunkHashes.reserve(8); + while (HashIt != SortedAttachments.end() && SmallChunkHashes.size() < 8) + { + if (LargeChunkHashes.contains(*HashIt)) { - ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + ResolvedLargeCount++; } - return {}; + else + { + SmallChunkHashes.push_back(*HashIt); + } + ++HashIt; } - auto It = Attachments.find(AttachmentHash); - ZEN_ASSERT(It != Attachments.end()); - IoBuffer Payload = GetPayload(AttachmentHash); - if (!Payload) + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> ResolvedSmallChunks; + RwLock ResolvedSmallChunksLock; + Latch ResolvedSmallChunksLatch(1); + for (const IoHash& SmallChunkHash : SmallChunkHashes) { - std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second); - ZEN_ASSERT(Op.has_value()); - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(AttachmentHash.ToHexString()); - Sb.Append("' for op: \n"); - Op.value().ToJson(Sb); + ResolvedSmallChunksLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&GetPayload, SmallChunkHash, &ResolvedSmallChunksLatch, &ResolvedSmallChunksLock, &ResolvedSmallChunks]() { + auto _ = MakeGuard([&ResolvedSmallChunksLatch] { ResolvedSmallChunksLatch.CountDown(); }); - ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView())); + IoBuffer Payload = GetPayload(SmallChunkHash); + { + RwLock::ExclusiveLockScope __(ResolvedSmallChunksLock); + ResolvedSmallChunks.insert_or_assign(SmallChunkHash, std::move(Payload)); + } + }); + } + ResolvedSmallChunksLatch.CountDown(); + ResolvedSmallChunksLatch.Wait(); - if (IgnoreMissingAttachments) + for (const IoHash& AttachmentHash : SmallChunkHashes) + { + auto ResolvedIt = ResolvedSmallChunks.find(AttachmentHash); + ZEN_ASSERT(ResolvedIt != ResolvedSmallChunks.end()); + IoBuffer Payload = std::move(ResolvedIt->second); + auto It = Attachments.find(AttachmentHash); + ZEN_ASSERT(It != Attachments.end()); + if (!Payload) { - continue; + std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second); + ZEN_ASSERT(Op.has_value()); + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(AttachmentHash.ToHexString()); + Sb.Append("' for op: \n"); + Op.value().ToJson(Sb); + + ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView())); + + if (IgnoreMissingAttachments) + { + ResolvedFailedCount++; + continue; + } + else + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; + } } - else + + uint64_t PayloadSize = Payload.GetSize(); + if (PayloadSize > MaxChunkEmbedSize) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) + if (LargeChunkHashes.insert(AttachmentHash).second) { - ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); }); + LargeAttachmentCount++; } - return {}; + ResolvedLargeCount++; + continue; + } + else + { + ResolvedSmallCount++; } - } - uint64_t PayloadSize = Payload.GetSize(); - if (PayloadSize > MaxChunkEmbedSize) - { - if (LargeChunkHashes.insert(AttachmentHash).second) + if (!BlockAttachmentHashes.insert(AttachmentHash).second) { - OnLargeAttachment(AttachmentHash); - LargeAttachmentCount++; + continue; } - continue; - } - if (!BlockAttachmentHashes.insert(AttachmentHash).second) - { - continue; - } + const int CurrentOpLSN = It->second; - const int CurrentOpLSN = It->second; + BlockSize += PayloadSize; + if (BuildBlocks) + { + ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); + } + else + { + Payload = {}; + } - BlockSize += PayloadSize; - if (BuildBlocks) - { - ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); - } - else - { - Payload = {}; + if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp)) + { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + ComposedBlocks++; + } + else + { + ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); + OnBlockChunks(BlockAttachmentHashes); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + GeneratedBlockCount++; + } + LastLSNOp = CurrentOpLSN; } + } - if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp)) + if (BlockSize > 0) + { + if (!IsCancelled(OptionalContext)) { size_t BlockIndex = AddBlock(BlocksLock, Blocks); if (BuildBlocks) @@ -667,6 +754,7 @@ BuildContainer(CidStore& ChunkStore, BlockIndex, AsyncOnBlock, RemoteResult); + ComposedBlocks++; } else { @@ -685,51 +773,15 @@ BuildContainer(CidStore& ChunkStore, BlockSize = 0; GeneratedBlockCount++; } - LastLSNOp = CurrentOpLSN; - } - if (BlockSize > 0) - { - size_t BlockIndex = AddBlock(BlocksLock, Blocks); - if (BuildBlocks) - { - CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - } - else - { - ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); - OnBlockChunks(BlockAttachmentHashes); - } - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); - } - BlockAttachmentHashes.clear(); - ChunksInBlock.clear(); - BlockSize = 0; - GeneratedBlockCount++; } - SectionOpsWriter.EndArray(); // "ops" + ReportProgress(OptionalContext, + fmt::format("Resolving attachments: {} large, {} small, {} blocks built", + ResolvedLargeCount, + ResolvedSmallCount, + ComposedBlocks), + SortedAttachments.size(), + 0); - if (IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); - } - return {}; - } ReportMessage(OptionalContext, fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", SortedAttachments.size(), @@ -738,7 +790,6 @@ BuildContainer(CidStore& ChunkStore, LargeAttachmentCount, NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); - CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); @@ -754,10 +805,6 @@ BuildContainer(CidStore& ChunkStore, } return {}; } - ReportMessage(OptionalContext, - fmt::format("Added oplog section {}, {}", - CompressedOpsSection.DecodeRawHash(), - NiceBytes(CompressedOpsSection.GetCompressedSize()))); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) @@ -856,9 +903,9 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, - tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments) + bool EmbedLooseFiles) { WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); @@ -875,7 +922,7 @@ BuildContainer(CidStore& ChunkStore, AsyncOnBlock, OnLargeAttachment, OnBlockChunks, - OutOptionalTempAttachments, + EmbedLooseFiles, nullptr, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; @@ -891,18 +938,18 @@ struct UploadInfo }; void -UploadAttachments(WorkerThreadPool& WorkerPool, - CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, - const std::vector<std::vector<IoHash>>& BlockChunks, - const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, - const tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>& TempAttachments, - const std::unordered_set<IoHash, IoHash::Hasher>& Needs, - bool ForceAll, - UploadInfo& Info, - AsyncRemoteResult& RemoteResult, - JobContext* OptionalContext) +UploadAttachments(WorkerThreadPool& WorkerPool, + CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, + const std::vector<std::vector<IoHash>>& BlockChunks, + const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, + const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments, + const std::unordered_set<IoHash, IoHash::Hasher>& Needs, + bool ForceAll, + UploadInfo& Info, + AsyncRemoteResult& RemoteResult, + JobContext* OptionalContext) { using namespace std::literals; @@ -1004,16 +1051,6 @@ UploadAttachments(WorkerThreadPool& WorkerPool, continue; } - IoBuffer Payload; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = BlockIt->second; - } - else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) - { - Payload = LooseTmpFileIt->second; - } - SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; WorkerPool.ScheduleWork([&ChunkStore, @@ -1022,7 +1059,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, &RemoteResult, RawHash, &CreatedBlocks, - TempPayload = std::move(Payload), + &LooseFileAttachments, &Info, OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); @@ -1030,7 +1067,19 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { return; } - IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); + IoBuffer Payload; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = BlockIt->second; + } + else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) + { + Payload = LooseTmpFileIt->second(RawHash); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } if (!Payload) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), @@ -1258,10 +1307,11 @@ SaveOplog(CidStore& ChunkStore, CreateDirectories(AttachmentTempPath); } - AsyncRemoteResult RemoteResult; - RwLock AttachmentsLock; - std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; + AsyncRemoteResult RemoteResult; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; + tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { @@ -1331,10 +1381,12 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Found {} block chunks", Chunks.size()); }; - auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) { + auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash, + TGetAttachmentBufferFunc&& GetBufferFunc) { { RwLock::ExclusiveLockScope _(AttachmentsLock); LargeAttachments.insert(AttachmentHash); + LooseLargeFiles.insert_or_assign(AttachmentHash, std::move(GetBufferFunc)); } ZEN_DEBUG("Found attachment {}", AttachmentHash); }; @@ -1394,8 +1446,7 @@ SaveOplog(CidStore& ChunkStore, } } - tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> TempAttachments; - CbObject OplogContainerObject = BuildContainer(ChunkStore, + CbObject OplogContainerObject = BuildContainer(ChunkStore, Project, Oplog, MaxBlockSize, @@ -1407,7 +1458,7 @@ SaveOplog(CidStore& ChunkStore, OnBlock, OnLargeAttachment, OnBlockChunks, - EmbedLooseFiles ? &TempAttachments : nullptr, + EmbedLooseFiles, OptionalContext, /* out */ RemoteResult); if (!RemoteResult.IsError()) @@ -1451,7 +1502,7 @@ SaveOplog(CidStore& ChunkStore, LargeAttachments, BlockChunks, CreatedBlocks, - TempAttachments, + LooseLargeFiles, ContainerSaveResult.Needs, ForceUpload, Info, @@ -1510,7 +1561,7 @@ SaveOplog(CidStore& ChunkStore, LargeAttachments, BlockChunks, CreatedBlocks, - TempAttachments, + LooseLargeFiles, ContainerFinalizeResult.Needs, false, Info, @@ -1518,7 +1569,7 @@ SaveOplog(CidStore& ChunkStore, OptionalContext); } - TempAttachments.clear(); + LooseLargeFiles.clear(); CreatedBlocks.clear(); } RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); @@ -1612,54 +1663,65 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); - CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); - if (!SectionObject) - { - ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); - return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), - Timer.GetElapsedTimeMs() / 1000.500, - "Section has unexpected data type", - "Failed to save oplog container"}; - } - - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); - const uint64_t OpCount = OpsArray.Num(); - uint64_t OpsLoaded = 0; - ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); - BinaryWriter Writer; - for (CbFieldView OpEntry : OpsArray) { - CbObjectView Op = OpEntry.AsObjectView(); - Op.CopyTo(Writer); - CbObjectView TypedOp(Writer.GetData()); - const uint32_t OpLsn = Oplog.AppendNewOplogEntry(TypedOp); - Writer.Reset(); - if (OpLsn == ProjectStore::Oplog::kInvalidOp) + CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); + if (!SectionObject) { - ReportMessage(OptionalContext, fmt::format("Failed to save op {}", OpsLoaded)); + ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, - "Failed saving op", + "Section has unexpected data type", "Failed to save oplog container"}; } - ZEN_DEBUG("oplog entry #{}", OpLsn); - if (OpCount > 100000) - { - if (IsCancelled(OptionalContext)) + + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + const uint64_t OpCount = OpsArray.Num(); + + ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); + + const size_t OpsBatchSize = 8192; + std::vector<uint8_t> OpsData; + std::vector<size_t> OpDataOffsets; + size_t OpsCompleteCount = 0; + + OpsData.reserve(OpsBatchSize); + + auto AppendBatch = [&]() { + std::vector<CbObjectView> Ops; + Ops.reserve(OpDataOffsets.size()); + for (size_t OpDataOffset : OpDataOffsets) { - return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::OK), - Timer.GetElapsedTimeMs() / 1000.500, - "Operation cancelled", - ""}; + Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); } - if (OpsLoaded % 10000 == 0) + std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops); + OpsCompleteCount += OpLsns.size(); + OpsData.clear(); + OpDataOffsets.clear(); + ReportProgress(OptionalContext, + fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount), + OpCount, + OpCount - OpsCompleteCount); + }; + + BinaryWriter Writer; + for (CbFieldView OpEntry : OpsArray) + { + CbObjectView Op = OpEntry.AsObjectView(); + Op.CopyTo(Writer); + OpDataOffsets.push_back(OpsData.size()); + OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); + Writer.Reset(); + + if (OpDataOffsets.size() == OpsBatchSize) { - ReportProgress(OptionalContext, "Writing oplog", OpCount, OpCount - OpsLoaded); + AppendBatch(); } } - OpsLoaded++; + if (!OpDataOffsets.empty()) + { + AppendBatch(); + } } - ReportProgress(OptionalContext, "Writing oplog", OpCount, 0); return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; } @@ -1975,7 +2037,7 @@ LoadOplog(CidStore& ChunkStore, NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), Info.AttachmentsDownloaded.load(), NiceBytes(Info.AttachmentBytesDownloaded.load()), - NiceBytes(Info.AttachmentsStored.load()), + Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Info.MissingAttachmentCount.load())); return Result; diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 7f0cb0ebb..c5ebca646 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -84,10 +84,15 @@ public: struct RemoteStoreOptions { - size_t MaxBlockSize = 128u * 1024u * 1024u; - size_t MaxChunkEmbedSize = 1024u * 1024u; + static const size_t DefaultMaxBlockSize = 32u * 1024u * 1024u; + static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; + + size_t MaxBlockSize = DefaultMaxBlockSize; + size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; }; +typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; + RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, ProjectStore::Project& Project, @@ -97,10 +102,9 @@ RemoteProjectStore::LoadContainerResult BuildContainer( bool BuildBlocks, bool IgnoreMissingAttachments, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, - tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* - OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files" + bool EmbedLooseFiles); class JobContext; |