diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-11 05:56:13 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-11 11:56:13 +0200 |
| commit | 076ee85e41e346eaa2f44f25c5852b88744cca7d (patch) | |
| tree | 5afa2e22de0258aa85a64cb022941925f6c072ee /src | |
| parent | add console logging to zen command (#389) (diff) | |
| download | zen-076ee85e41e346eaa2f44f25c5852b88744cca7d.tar.xz zen-076ee85e41e346eaa2f44f25c5852b88744cca7d.zip | |
better sorting of attachments in oplog blocks (#390)
- sort attachments based on (first) associated op
- sort attachments based on key of op to get a more consistent order rather than order of ops written
- keep attachments from same op in same block
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 67 |
1 files changed, 54 insertions, 13 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index f10d7da63..080517a8d 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -364,8 +364,14 @@ BuildContainer(CidStore& ChunkStore, CB(RewrittenOp); }; + ZEN_INFO("Building exported oplog and fetching attachments"); + + tsl::robin_map<int, std::string> OpLSNToKey; + Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) { - Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert_or_assign(FieldView.AsAttachment(), LSN); }); + std::string_view Key = Op["key"sv].AsString(); + OpLSNToKey.insert({LSN, std::string(Key)}); + Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); if (OutLooseAttachments != nullptr) { RewriteOp(LSN, Op, [&SectionOpsWriter](CbObject Op) { SectionOpsWriter << Op; }); @@ -377,6 +383,33 @@ BuildContainer(CidStore& ChunkStore, OpCount++; }); + ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()); + + // Sort attachments so we get predictable blocks for the same oplog upload + std::vector<IoHash> SortedAttachments; + SortedAttachments.reserve(Attachments.size()); + for (const auto& It : Attachments) + { + SortedAttachments.push_back(It.first); + } + std::sort(SortedAttachments.begin(), SortedAttachments.end(), [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) { + auto LhsLNSIt = Attachments.find(Lhs); + ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end()); + auto RhsLNSIt = Attachments.find(Rhs); + ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end()); + if (LhsLNSIt->second == RhsLNSIt->second) + { + return Lhs < Rhs; + } + auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second); + ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end()); + auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second); + ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end()); + return LhsKeyIt->second < RhsKeyIt->second; + }); + + ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size()); + auto GetPayload = [&](const IoHash& AttachmentHash) { if (OutLooseAttachments != nullptr) { @@ -389,23 +422,17 @@ BuildContainer(CidStore& ChunkStore, return ChunkStore.FindChunkByCid(AttachmentHash); }; - // Sort attachments so we get predictable blocks for the same oplog upload - std::vector<IoHash> SortedAttachments; - SortedAttachments.reserve(Attachments.size()); - for (const auto It : Attachments) - { - SortedAttachments.push_back(It.first); - } - std::sort(SortedAttachments.begin(), SortedAttachments.end()); + int LastLSNOp = -1; + size_t GeneratedBlockCount = 0; + size_t LargeAttachmentCount = 0; for (const IoHash& AttachmentHash : SortedAttachments) { + auto It = Attachments.find(AttachmentHash); + ZEN_ASSERT(It != Attachments.end()); IoBuffer Payload = GetPayload(AttachmentHash); if (!Payload) { - auto It = Attachments.find(AttachmentHash); - ZEN_ASSERT(It != Attachments.end()); - std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second); ZEN_ASSERT(Op.has_value()); ExtendableStringBuilder<1024> Sb; @@ -432,6 +459,7 @@ BuildContainer(CidStore& ChunkStore, if (LargeChunkHashes.insert(AttachmentHash).second) { OnLargeAttachment(AttachmentHash); + LargeAttachmentCount++; } continue; } @@ -441,6 +469,8 @@ BuildContainer(CidStore& ChunkStore, continue; } + const int CurrentOpLSN = It->second; + BlockSize += PayloadSize; if (BuildBlocks) { @@ -451,8 +481,9 @@ BuildContainer(CidStore& ChunkStore, Payload = {}; } - if (BlockSize >= MaxBlockSize) + if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp)) { + ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size()); size_t BlockIndex = AddBlock(BlocksLock, Blocks); if (BuildBlocks) { @@ -479,10 +510,13 @@ BuildContainer(CidStore& ChunkStore, BlockAttachmentHashes.clear(); ChunksInBlock.clear(); BlockSize = 0; + GeneratedBlockCount++; } + LastLSNOp = CurrentOpLSN; } if (BlockSize > 0) { + ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size()); size_t BlockIndex = AddBlock(BlocksLock, Blocks); if (BuildBlocks) { @@ -509,9 +543,16 @@ BuildContainer(CidStore& ChunkStore, BlockAttachmentHashes.clear(); ChunksInBlock.clear(); BlockSize = 0; + GeneratedBlockCount++; } SectionOpsWriter.EndArray(); // "ops" + ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", + SortedAttachments.size(), + OpLSNToKey.size(), + GeneratedBlockCount, + LargeAttachmentCount); + CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); |