aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-11 05:56:13 -0400
committerGitHub <[email protected]>2023-09-11 11:56:13 +0200
commit076ee85e41e346eaa2f44f25c5852b88744cca7d (patch)
tree5afa2e22de0258aa85a64cb022941925f6c072ee /src
parentadd console logging to zen command (#389) (diff)
downloadzen-076ee85e41e346eaa2f44f25c5852b88744cca7d.tar.xz
zen-076ee85e41e346eaa2f44f25c5852b88744cca7d.zip
better sorting of attachments in oplog blocks (#390)
- sort attachments based on (first) associated op - sort attachments based on key of op to get a more consistent order rather than order of ops written - keep attachments from same op in same block
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp67
1 files changed, 54 insertions, 13 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index f10d7da63..080517a8d 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -364,8 +364,14 @@ BuildContainer(CidStore& ChunkStore,
CB(RewrittenOp);
};
+ ZEN_INFO("Building exported oplog and fetching attachments");
+
+ tsl::robin_map<int, std::string> OpLSNToKey;
+
Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) {
- Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert_or_assign(FieldView.AsAttachment(), LSN); });
+ std::string_view Key = Op["key"sv].AsString();
+ OpLSNToKey.insert({LSN, std::string(Key)});
+ Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
if (OutLooseAttachments != nullptr)
{
RewriteOp(LSN, Op, [&SectionOpsWriter](CbObject Op) { SectionOpsWriter << Op; });
@@ -377,6 +383,33 @@ BuildContainer(CidStore& ChunkStore,
OpCount++;
});
+ ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size());
+
+ // Sort attachments so we get predictable blocks for the same oplog upload
+ std::vector<IoHash> SortedAttachments;
+ SortedAttachments.reserve(Attachments.size());
+ for (const auto& It : Attachments)
+ {
+ SortedAttachments.push_back(It.first);
+ }
+ std::sort(SortedAttachments.begin(), SortedAttachments.end(), [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) {
+ auto LhsLNSIt = Attachments.find(Lhs);
+ ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end());
+ auto RhsLNSIt = Attachments.find(Rhs);
+ ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end());
+ if (LhsLNSIt->second == RhsLNSIt->second)
+ {
+ return Lhs < Rhs;
+ }
+ auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second);
+ ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end());
+ auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second);
+ ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end());
+ return LhsKeyIt->second < RhsKeyIt->second;
+ });
+
+ ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size());
+
auto GetPayload = [&](const IoHash& AttachmentHash) {
if (OutLooseAttachments != nullptr)
{
@@ -389,23 +422,17 @@ BuildContainer(CidStore& ChunkStore,
return ChunkStore.FindChunkByCid(AttachmentHash);
};
- // Sort attachments so we get predictable blocks for the same oplog upload
- std::vector<IoHash> SortedAttachments;
- SortedAttachments.reserve(Attachments.size());
- for (const auto It : Attachments)
- {
- SortedAttachments.push_back(It.first);
- }
- std::sort(SortedAttachments.begin(), SortedAttachments.end());
+ int LastLSNOp = -1;
+ size_t GeneratedBlockCount = 0;
+ size_t LargeAttachmentCount = 0;
for (const IoHash& AttachmentHash : SortedAttachments)
{
+ auto It = Attachments.find(AttachmentHash);
+ ZEN_ASSERT(It != Attachments.end());
IoBuffer Payload = GetPayload(AttachmentHash);
if (!Payload)
{
- auto It = Attachments.find(AttachmentHash);
- ZEN_ASSERT(It != Attachments.end());
-
std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
ZEN_ASSERT(Op.has_value());
ExtendableStringBuilder<1024> Sb;
@@ -432,6 +459,7 @@ BuildContainer(CidStore& ChunkStore,
if (LargeChunkHashes.insert(AttachmentHash).second)
{
OnLargeAttachment(AttachmentHash);
+ LargeAttachmentCount++;
}
continue;
}
@@ -441,6 +469,8 @@ BuildContainer(CidStore& ChunkStore,
continue;
}
+ const int CurrentOpLSN = It->second;
+
BlockSize += PayloadSize;
if (BuildBlocks)
{
@@ -451,8 +481,9 @@ BuildContainer(CidStore& ChunkStore,
Payload = {};
}
- if (BlockSize >= MaxBlockSize)
+ if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
{
+ ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size());
size_t BlockIndex = AddBlock(BlocksLock, Blocks);
if (BuildBlocks)
{
@@ -479,10 +510,13 @@ BuildContainer(CidStore& ChunkStore,
BlockAttachmentHashes.clear();
ChunksInBlock.clear();
BlockSize = 0;
+ GeneratedBlockCount++;
}
+ LastLSNOp = CurrentOpLSN;
}
if (BlockSize > 0)
{
+ ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size());
size_t BlockIndex = AddBlock(BlocksLock, Blocks);
if (BuildBlocks)
{
@@ -509,9 +543,16 @@ BuildContainer(CidStore& ChunkStore,
BlockAttachmentHashes.clear();
ChunksInBlock.clear();
BlockSize = 0;
+ GeneratedBlockCount++;
}
SectionOpsWriter.EndArray(); // "ops"
+ ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
+ SortedAttachments.size(),
+ OpLSNToKey.size(),
+ GeneratedBlockCount,
+ LargeAttachmentCount);
+
CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()));