aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/chunking
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-12-15 13:20:21 +0100
committerGitHub Enterprise <[email protected]>2025-12-15 13:20:21 +0100
commita715d3ab7701e6257730a73c62567052d21c9771 (patch)
tree1f6b1de9c7cf11ec1403187d77d74a3b1af52a39 /src/zenremotestore/chunking
parentshow download source data (#689) (diff)
downloadzen-a715d3ab7701e6257730a73c62567052d21c9771.tar.xz
zen-a715d3ab7701e6257730a73c62567052d21c9771.zip
oplog download size (#690)
- Bugfix: Upload of oplogs could reference multiple blocks for the same chunk causing redundant downloads of blocks - Improvement: Use the improved block reuse selection function from zen builds upload in zen oplog-export to reduce oplog download size
Diffstat (limited to 'src/zenremotestore/chunking')
-rw-r--r--src/zenremotestore/chunking/chunkblock.cpp431
1 files changed, 431 insertions, 0 deletions
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp
index 05ae13de1..a5d0db205 100644
--- a/src/zenremotestore/chunking/chunkblock.cpp
+++ b/src/zenremotestore/chunking/chunkblock.cpp
@@ -5,14 +5,23 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+
+#include <zenremotestore/operationlogoutput.h>
#include <vector>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <unordered_map>
+# include <numeric>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -261,6 +270,188 @@ IterateChunkBlock(const SharedBuffer& BlockPayload,
return true;
};
+std::vector<size_t>
+FindReuseBlocks(OperationLogOutput& Output,
+ const uint8_t BlockReuseMinPercentLimit,
+ const bool IsVerbose,
+ ReuseBlocksStatistics& Stats,
+ const std::vector<ChunkBlockDescription>& KnownBlocks,
+ std::span<const IoHash> ChunkHashes,
+ std::span<const uint32_t> ChunkIndexes,
+ std::vector<uint32_t>& OutUnusedChunkIndexes)
+{
+ ZEN_TRACE_CPU("FindReuseBlocks");
+
+ // Find all blocks with a usage level higher than MinPercentLimit
+ // Pick out the blocks with usage higher or equal to MinPercentLimit
+ // Sort them with highest size usage - most usage first
+ // Make a list of all chunks and mark them as not found
+ // For each block, recalculate the block has usage percent based on the chunks marked as not found
+ // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list
+ // Repeat for following all remaining block that initially matched MinPercentLimit
+
+ std::vector<size_t> FilteredReuseBlockIndexes;
+
+ uint32_t ChunkCount = gsl::narrow<uint32_t>(ChunkHashes.size());
+ std::vector<bool> ChunkFound(ChunkCount, false);
+
+ if (ChunkCount > 0)
+ {
+ if (!KnownBlocks.empty())
+ {
+ Stopwatch ReuseTimer;
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ ChunkHashToChunkIndex.reserve(ChunkIndexes.size());
+ for (uint32_t ChunkIndex : ChunkIndexes)
+ {
+ ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex);
+ }
+
+ std::vector<size_t> BlockSizes(KnownBlocks.size(), 0);
+ std::vector<size_t> BlockUseSize(KnownBlocks.size(), 0);
+
+ std::vector<size_t> ReuseBlockIndexes;
+
+ for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
+ {
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+
+ if (KnownBlock.BlockHash != IoHash::Zero && KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size())
+ {
+ size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size();
+ if (BlockAttachmentCount == 0)
+ {
+ continue;
+ }
+ size_t ReuseSize = 0;
+ size_t BlockSize = 0;
+ size_t FoundAttachmentCount = 0;
+ size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size();
+ for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++)
+ {
+ const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
+ const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ BlockSize += BlockChunkSize;
+ if (ChunkHashToChunkIndex.contains(BlockChunkHash))
+ {
+ ReuseSize += BlockChunkSize;
+ FoundAttachmentCount++;
+ }
+ }
+
+ size_t ReusePercent = (ReuseSize * 100) / BlockSize;
+
+ if (ReusePercent >= BlockReuseMinPercentLimit)
+ {
+ if (IsVerbose)
+ {
+ ZEN_OPERATION_LOG_INFO(Output,
+ "Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundAttachmentCount,
+ ReusePercent);
+ }
+ ReuseBlockIndexes.push_back(KnownBlockIndex);
+
+ BlockSizes[KnownBlockIndex] = BlockSize;
+ BlockUseSize[KnownBlockIndex] = ReuseSize;
+ }
+ else if (FoundAttachmentCount > 0)
+ {
+ // if (IsVerbose)
+ //{
+ // ZEN_OPERATION_LOG_INFO(Output, "Skipping block {}. {} attachments found, usage level: {}%",
+ // KnownBlock.BlockHash,
+ // FoundAttachmentCount, ReusePercent);
+ //}
+ Stats.RejectedBlockCount++;
+ Stats.RejectedChunkCount += FoundAttachmentCount;
+ Stats.RejectedByteCount += ReuseSize;
+ }
+ }
+ }
+
+ if (!ReuseBlockIndexes.empty())
+ {
+ std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) {
+ return BlockUseSize[Lhs] > BlockUseSize[Rhs];
+ });
+
+ for (size_t KnownBlockIndex : ReuseBlockIndexes)
+ {
+ std::vector<uint32_t> FoundChunkIndexes;
+ size_t BlockSize = 0;
+ size_t AdjustedReuseSize = 0;
+ size_t AdjustedRawReuseSize = 0;
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++)
+ {
+ const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
+ const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ BlockSize += BlockChunkSize;
+ if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = It->second;
+ if (!ChunkFound[ChunkIndex])
+ {
+ FoundChunkIndexes.push_back(ChunkIndex);
+ AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex];
+ }
+ }
+ }
+
+ size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize;
+
+ if (ReusePercent >= BlockReuseMinPercentLimit)
+ {
+ if (IsVerbose)
+ {
+ ZEN_OPERATION_LOG_INFO(Output,
+ "Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundChunkIndexes.size(),
+ ReusePercent);
+ }
+ FilteredReuseBlockIndexes.push_back(KnownBlockIndex);
+
+ for (uint32_t ChunkIndex : FoundChunkIndexes)
+ {
+ ChunkFound[ChunkIndex] = true;
+ }
+ Stats.AcceptedChunkCount += FoundChunkIndexes.size();
+ Stats.AcceptedByteCount += AdjustedReuseSize;
+ Stats.AcceptedRawByteCount += AdjustedRawReuseSize;
+ Stats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size();
+ Stats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize;
+ }
+ else
+ {
+ // if (IsVerbose)
+ //{
+ // ZEN_OPERATION_LOG_INFO(Output, "Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash,
+ // ReusePercent);
+ //}
+ Stats.RejectedBlockCount++;
+ Stats.RejectedChunkCount += FoundChunkIndexes.size();
+ Stats.RejectedByteCount += AdjustedReuseSize;
+ }
+ }
+ }
+ }
+ OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - Stats.AcceptedChunkCount);
+ for (uint32_t ChunkIndex : ChunkIndexes)
+ {
+ if (!ChunkFound[ChunkIndex])
+ {
+ OutUnusedChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ }
+ return FilteredReuseBlockIndexes;
+}
+
#if ZEN_WITH_TESTS
namespace testutils {
@@ -310,6 +501,246 @@ TEST_CASE("project.store.block")
HeaderSize));
}
+TEST_CASE("project.store.reuseblocks")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ std::vector<std::vector<std::size_t>> BlockAttachmentSizes(
+ {std::vector<std::size_t>{7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489,
+ 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
+ 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225},
+ {17633, 16825, 15738, 18031, 17225, 11566, 13656, 16006, 11124, 13466, 11093, 14269, 12257, 13685, 13489,
+ 17194, 16151, 15482, 16217, 13511, 16738, 15061, 17537, 12759, 11916, 18210, 12235, 14024, 11582, 15251,
+ 11491, 15464, 14607, 18135, 13767, 14045, 14415, 15007, 18876, 16761, 13359, 18526, 14097, 14855, 18225}});
+
+ std::vector<ChunkBlockDescription> BlockDescriptions;
+ for (auto& AttachmentSizes : BlockAttachmentSizes)
+ {
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
+ std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks;
+ Chunks.reserve(AttachmentSizes.size());
+ for (const auto& It : AttachmentsWithId)
+ {
+ Chunks.push_back(
+ std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return {Buffer.DecodeRawSize(), Buffer};
+ }));
+ }
+ ChunkBlockDescription Block;
+ CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block);
+ BlockDescriptions.emplace_back(std::move(Block));
+ }
+
+ LoggerRef LogRef = Log();
+ std::unique_ptr<OperationLogOutput> LogOutput(CreateStandardLogOutput(LogRef));
+
+ {
+ // We use just about all the chunks - should result in use of both blocks
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<IoHash> ManyChunkHashes;
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[0].ChunkRawHashes.begin(),
+ BlockDescriptions[0].ChunkRawHashes.end() - 1);
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[1].ChunkRawHashes.begin() + 1,
+ BlockDescriptions[1].ChunkRawHashes.end());
+ std::vector<uint32_t> ManyChunkIndexes;
+ ManyChunkIndexes.resize(ManyChunkHashes.size());
+ std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0);
+ std::vector<uint32_t> UnusedChunkIndexes;
+
+ std::vector<size_t> ReusedBlocks = FindReuseBlocks(*LogOutput,
+ 80,
+ false,
+ ReuseBlocksStats,
+ BlockDescriptions,
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes);
+
+ CHECK_EQ(2u, ReusedBlocks.size());
+ CHECK_EQ(0u, UnusedChunkIndexes.size());
+ }
+
+ {
+ // We now only about one of the blocks
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<IoHash> ManyChunkHashes;
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[0].ChunkRawHashes.begin(),
+ BlockDescriptions[0].ChunkRawHashes.end() - 1);
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[1].ChunkRawHashes.begin() + 1,
+ BlockDescriptions[1].ChunkRawHashes.end());
+ std::vector<uint32_t> ManyChunkIndexes;
+ ManyChunkIndexes.resize(ManyChunkHashes.size());
+ std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0);
+ std::vector<uint32_t> UnusedChunkIndexes;
+
+ std::vector<size_t> ReusedBlocks = FindReuseBlocks(*LogOutput,
+ 80,
+ false,
+ ReuseBlocksStats,
+ std::vector<ChunkBlockDescription>{BlockDescriptions[0]},
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes);
+
+ CHECK_EQ(1u, ReusedBlocks.size());
+ CHECK_EQ(BlockDescriptions[1].ChunkRawHashes.size() - 1, UnusedChunkIndexes.size());
+ }
+
+ {
+ std::vector<IoHash> ManyChunkHashes;
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[0].ChunkRawHashes.begin(),
+ BlockDescriptions[0].ChunkRawHashes.end() - BlockDescriptions[0].ChunkRawHashes.size() / 2);
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[1].ChunkRawHashes.begin() + BlockDescriptions[1].ChunkRawHashes.size() / 2,
+ BlockDescriptions[1].ChunkRawHashes.end());
+ std::vector<uint32_t> ManyChunkIndexes;
+ ManyChunkIndexes.resize(ManyChunkHashes.size());
+ std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0);
+
+ {
+ // We use half the chunks - should result in no use of blocks due to 80% limit
+ std::vector<uint32_t> UnusedChunkIndexes80Percent;
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<size_t> ReusedBlocks80Percent = FindReuseBlocks(*LogOutput,
+ 80,
+ false,
+ ReuseBlocksStats,
+ BlockDescriptions,
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes80Percent);
+
+ CHECK_EQ(0u, ReusedBlocks80Percent.size());
+ CHECK_EQ(ManyChunkHashes.size(), UnusedChunkIndexes80Percent.size());
+ }
+ {
+ // We use half the chunks - should result in use of both blocks due to 40% limit
+ std::vector<uint32_t> UnusedChunkIndexes40Percent;
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<size_t> ReusedBlocks40Percent = FindReuseBlocks(*LogOutput,
+ 40,
+ false,
+ ReuseBlocksStats,
+ BlockDescriptions,
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes40Percent);
+
+ CHECK_EQ(2u, ReusedBlocks40Percent.size());
+ CHECK_EQ(0u, UnusedChunkIndexes40Percent.size());
+ }
+ }
+
+ {
+ std::vector<IoHash> ManyChunkHashes;
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[0].ChunkRawHashes.begin(),
+ BlockDescriptions[0].ChunkRawHashes.end() - BlockDescriptions[0].ChunkRawHashes.size() / 2);
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[1].ChunkRawHashes.begin() + 1,
+ BlockDescriptions[1].ChunkRawHashes.end());
+ std::vector<uint32_t> ManyChunkIndexes;
+ ManyChunkIndexes.resize(ManyChunkHashes.size());
+ std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0);
+
+ {
+ // We use half the chunks for first block - should result in use of one blocks due to 80% limit
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<uint32_t> UnusedChunkIndexes80Percent;
+ std::vector<size_t> ReusedBlocks80Percent = FindReuseBlocks(*LogOutput,
+ 80,
+ false,
+ ReuseBlocksStats,
+ BlockDescriptions,
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes80Percent);
+
+ CHECK_EQ(1u, ReusedBlocks80Percent.size());
+ CHECK_EQ(BlockDescriptions[0].ChunkRawHashes.size() - BlockDescriptions[0].ChunkRawHashes.size() / 2,
+ UnusedChunkIndexes80Percent.size());
+ }
+ {
+ // We use half the chunks - should result in use of both blocks due to 40% limit
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<uint32_t> UnusedChunkIndexes40Percent;
+ std::vector<size_t> ReusedBlocks40Percent = FindReuseBlocks(*LogOutput,
+ 40,
+ false,
+ ReuseBlocksStats,
+ BlockDescriptions,
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes40Percent);
+
+ CHECK_EQ(2u, ReusedBlocks40Percent.size());
+ CHECK_EQ(0u, UnusedChunkIndexes40Percent.size());
+ }
+ }
+
+ {
+ // Test simulate ThinkChunkBlockDescriptions
+
+ for (ChunkBlockDescription& BlockDescription : BlockDescriptions)
+ {
+ BlockDescription.HeaderSize = 0;
+ BlockDescription.ChunkRawLengths = std::vector<uint32_t>(BlockDescription.ChunkRawHashes.size(), 1);
+ BlockDescription.ChunkCompressedLengths = std::vector<uint32_t>(BlockDescription.ChunkRawHashes.size(), 1);
+ }
+
+ std::vector<IoHash> ManyChunkHashes;
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[0].ChunkRawHashes.begin(),
+ BlockDescriptions[0].ChunkRawHashes.end() - BlockDescriptions[0].ChunkRawHashes.size() / 2);
+ ManyChunkHashes.insert(ManyChunkHashes.end(),
+ BlockDescriptions[1].ChunkRawHashes.begin() + 1,
+ BlockDescriptions[1].ChunkRawHashes.end());
+ std::vector<uint32_t> ManyChunkIndexes;
+ ManyChunkIndexes.resize(ManyChunkHashes.size());
+ std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0);
+
+ {
+ // We use half the chunks for first block - should result in use of one blocks due to 80% limit
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<uint32_t> UnusedChunkIndexes80Percent;
+ std::vector<size_t> ReusedBlocks80Percent = FindReuseBlocks(*LogOutput,
+ 80,
+ false,
+ ReuseBlocksStats,
+ BlockDescriptions,
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes80Percent);
+
+ CHECK_EQ(1u, ReusedBlocks80Percent.size());
+ CHECK_EQ(BlockDescriptions[0].ChunkRawHashes.size() - BlockDescriptions[0].ChunkRawHashes.size() / 2,
+ UnusedChunkIndexes80Percent.size());
+ }
+ {
+ // We use half the chunks - should result in use of both blocks due to 40% limit
+ ReuseBlocksStatistics ReuseBlocksStats;
+ std::vector<uint32_t> UnusedChunkIndexes40Percent;
+ std::vector<size_t> ReusedBlocks40Percent = FindReuseBlocks(*LogOutput,
+ 40,
+ false,
+ ReuseBlocksStats,
+ BlockDescriptions,
+ ManyChunkHashes,
+ ManyChunkIndexes,
+ UnusedChunkIndexes40Percent);
+
+ CHECK_EQ(2u, ReusedBlocks40Percent.size());
+ CHECK_EQ(0u, UnusedChunkIndexes40Percent.size());
+ }
+ }
+}
+
void
chunkblock_forcelink()
{