diff options
| author | Dan Engelbrecht <[email protected]> | 2025-12-15 13:20:21 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-12-15 13:20:21 +0100 |
| commit | a715d3ab7701e6257730a73c62567052d21c9771 (patch) | |
| tree | 1f6b1de9c7cf11ec1403187d77d74a3b1af52a39 /src/zenremotestore/chunking | |
| parent | show download source data (#689) (diff) | |
| download | zen-a715d3ab7701e6257730a73c62567052d21c9771.tar.xz zen-a715d3ab7701e6257730a73c62567052d21c9771.zip | |
oplog download size (#690)
- Bugfix: Upload of oplogs could reference multiple blocks for the same chunk causing redundant downloads of blocks
- Improvement: Use the improved block reuse selection function from zen builds upload in zen oplog-export to reduce oplog download size
Diffstat (limited to 'src/zenremotestore/chunking')
| -rw-r--r-- | src/zenremotestore/chunking/chunkblock.cpp | 431 |
1 files changed, 431 insertions, 0 deletions
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp index 05ae13de1..a5d0db205 100644 --- a/src/zenremotestore/chunking/chunkblock.cpp +++ b/src/zenremotestore/chunking/chunkblock.cpp @@ -5,14 +5,23 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/timer.h> +#include <zencore/trace.h> + +#include <zenremotestore/operationlogoutput.h> #include <vector> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + #if ZEN_WITH_TESTS # include <zencore/testing.h> # include <zencore/testutils.h> # include <unordered_map> +# include <numeric> #endif // ZEN_WITH_TESTS namespace zen { @@ -261,6 +270,188 @@ IterateChunkBlock(const SharedBuffer& BlockPayload, return true; }; +std::vector<size_t> +FindReuseBlocks(OperationLogOutput& Output, + const uint8_t BlockReuseMinPercentLimit, + const bool IsVerbose, + ReuseBlocksStatistics& Stats, + const std::vector<ChunkBlockDescription>& KnownBlocks, + std::span<const IoHash> ChunkHashes, + std::span<const uint32_t> ChunkIndexes, + std::vector<uint32_t>& OutUnusedChunkIndexes) +{ + ZEN_TRACE_CPU("FindReuseBlocks"); + + // Find all blocks with a usage level higher than MinPercentLimit + // Pick out the blocks with usage higher or equal to MinPercentLimit + // Sort them with highest size usage - most usage first + // Make a list of all chunks and mark them as not found + // For each block, recalculate the block has usage percent based on the chunks marked as not found + // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list + // Repeat for following all remaining block that initially matched MinPercentLimit + + std::vector<size_t> FilteredReuseBlockIndexes; + + uint32_t ChunkCount = gsl::narrow<uint32_t>(ChunkHashes.size()); + std::vector<bool> ChunkFound(ChunkCount, false); + + if (ChunkCount > 0) + { + if (!KnownBlocks.empty()) + { + Stopwatch ReuseTimer; + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; + ChunkHashToChunkIndex.reserve(ChunkIndexes.size()); + for (uint32_t ChunkIndex : ChunkIndexes) + { + ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex); + } + + std::vector<size_t> BlockSizes(KnownBlocks.size(), 0); + std::vector<size_t> BlockUseSize(KnownBlocks.size(), 0); + + std::vector<size_t> ReuseBlockIndexes; + + for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) + { + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + + if (KnownBlock.BlockHash != IoHash::Zero && KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size()) + { + size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); + if (BlockAttachmentCount == 0) + { + continue; + } + size_t ReuseSize = 0; + size_t BlockSize = 0; + size_t FoundAttachmentCount = 0; + size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size(); + for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++) + { + const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; + const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + BlockSize += BlockChunkSize; + if (ChunkHashToChunkIndex.contains(BlockChunkHash)) + { + ReuseSize += BlockChunkSize; + FoundAttachmentCount++; + } + } + + size_t ReusePercent = (ReuseSize * 100) / BlockSize; + + if (ReusePercent >= BlockReuseMinPercentLimit) + { + if (IsVerbose) + { + ZEN_OPERATION_LOG_INFO(Output, + "Reusing block {}. {} attachments found, usage level: {}%", + KnownBlock.BlockHash, + FoundAttachmentCount, + ReusePercent); + } + ReuseBlockIndexes.push_back(KnownBlockIndex); + + BlockSizes[KnownBlockIndex] = BlockSize; + BlockUseSize[KnownBlockIndex] = ReuseSize; + } + else if (FoundAttachmentCount > 0) + { + // if (IsVerbose) + //{ + // ZEN_OPERATION_LOG_INFO(Output, "Skipping block {}. {} attachments found, usage level: {}%", + // KnownBlock.BlockHash, + // FoundAttachmentCount, ReusePercent); + //} + Stats.RejectedBlockCount++; + Stats.RejectedChunkCount += FoundAttachmentCount; + Stats.RejectedByteCount += ReuseSize; + } + } + } + + if (!ReuseBlockIndexes.empty()) + { + std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) { + return BlockUseSize[Lhs] > BlockUseSize[Rhs]; + }); + + for (size_t KnownBlockIndex : ReuseBlockIndexes) + { + std::vector<uint32_t> FoundChunkIndexes; + size_t BlockSize = 0; + size_t AdjustedReuseSize = 0; + size_t AdjustedRawReuseSize = 0; + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++) + { + const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; + const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + BlockSize += BlockChunkSize; + if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = It->second; + if (!ChunkFound[ChunkIndex]) + { + FoundChunkIndexes.push_back(ChunkIndex); + AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex]; + } + } + } + + size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize; + + if (ReusePercent >= BlockReuseMinPercentLimit) + { + if (IsVerbose) + { + ZEN_OPERATION_LOG_INFO(Output, + "Reusing block {}. {} attachments found, usage level: {}%", + KnownBlock.BlockHash, + FoundChunkIndexes.size(), + ReusePercent); + } + FilteredReuseBlockIndexes.push_back(KnownBlockIndex); + + for (uint32_t ChunkIndex : FoundChunkIndexes) + { + ChunkFound[ChunkIndex] = true; + } + Stats.AcceptedChunkCount += FoundChunkIndexes.size(); + Stats.AcceptedByteCount += AdjustedReuseSize; + Stats.AcceptedRawByteCount += AdjustedRawReuseSize; + Stats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size(); + Stats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize; + } + else + { + // if (IsVerbose) + //{ + // ZEN_OPERATION_LOG_INFO(Output, "Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, + // ReusePercent); + //} + Stats.RejectedBlockCount++; + Stats.RejectedChunkCount += FoundChunkIndexes.size(); + Stats.RejectedByteCount += AdjustedReuseSize; + } + } + } + } + OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - Stats.AcceptedChunkCount); + for (uint32_t ChunkIndex : ChunkIndexes) + { + if (!ChunkFound[ChunkIndex]) + { + OutUnusedChunkIndexes.push_back(ChunkIndex); + } + } + } + return FilteredReuseBlockIndexes; +} + #if ZEN_WITH_TESTS namespace testutils { @@ -310,6 +501,246 @@ TEST_CASE("project.store.block") HeaderSize)); } +TEST_CASE("project.store.reuseblocks") +{ + using namespace std::literals; + using namespace testutils; + + std::vector<std::vector<std::size_t>> BlockAttachmentSizes( + {std::vector<std::size_t>{7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, + 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, + 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}, + {17633, 16825, 15738, 18031, 17225, 11566, 13656, 16006, 11124, 13466, 11093, 14269, 12257, 13685, 13489, + 17194, 16151, 15482, 16217, 13511, 16738, 15061, 17537, 12759, 11916, 18210, 12235, 14024, 11582, 15251, + 11491, 15464, 14607, 18135, 13767, 14045, 14415, 15007, 18876, 16761, 13359, 18526, 14097, 14855, 18225}}); + + std::vector<ChunkBlockDescription> BlockDescriptions; + for (auto& AttachmentSizes : BlockAttachmentSizes) + { + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); + std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks; + Chunks.reserve(AttachmentSizes.size()); + for (const auto& It : AttachmentsWithId) + { + Chunks.push_back( + std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Buffer.DecodeRawSize(), Buffer}; + })); + } + ChunkBlockDescription Block; + CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block); + BlockDescriptions.emplace_back(std::move(Block)); + } + + LoggerRef LogRef = Log(); + std::unique_ptr<OperationLogOutput> LogOutput(CreateStandardLogOutput(LogRef)); + + { + // We use just about all the chunks - should result in use of both blocks + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<IoHash> ManyChunkHashes; + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[0].ChunkRawHashes.begin(), + BlockDescriptions[0].ChunkRawHashes.end() - 1); + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[1].ChunkRawHashes.begin() + 1, + BlockDescriptions[1].ChunkRawHashes.end()); + std::vector<uint32_t> ManyChunkIndexes; + ManyChunkIndexes.resize(ManyChunkHashes.size()); + std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0); + std::vector<uint32_t> UnusedChunkIndexes; + + std::vector<size_t> ReusedBlocks = FindReuseBlocks(*LogOutput, + 80, + false, + ReuseBlocksStats, + BlockDescriptions, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes); + + CHECK_EQ(2u, ReusedBlocks.size()); + CHECK_EQ(0u, UnusedChunkIndexes.size()); + } + + { + // We now only about one of the blocks + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<IoHash> ManyChunkHashes; + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[0].ChunkRawHashes.begin(), + BlockDescriptions[0].ChunkRawHashes.end() - 1); + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[1].ChunkRawHashes.begin() + 1, + BlockDescriptions[1].ChunkRawHashes.end()); + std::vector<uint32_t> ManyChunkIndexes; + ManyChunkIndexes.resize(ManyChunkHashes.size()); + std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0); + std::vector<uint32_t> UnusedChunkIndexes; + + std::vector<size_t> ReusedBlocks = FindReuseBlocks(*LogOutput, + 80, + false, + ReuseBlocksStats, + std::vector<ChunkBlockDescription>{BlockDescriptions[0]}, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes); + + CHECK_EQ(1u, ReusedBlocks.size()); + CHECK_EQ(BlockDescriptions[1].ChunkRawHashes.size() - 1, UnusedChunkIndexes.size()); + } + + { + std::vector<IoHash> ManyChunkHashes; + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[0].ChunkRawHashes.begin(), + BlockDescriptions[0].ChunkRawHashes.end() - BlockDescriptions[0].ChunkRawHashes.size() / 2); + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[1].ChunkRawHashes.begin() + BlockDescriptions[1].ChunkRawHashes.size() / 2, + BlockDescriptions[1].ChunkRawHashes.end()); + std::vector<uint32_t> ManyChunkIndexes; + ManyChunkIndexes.resize(ManyChunkHashes.size()); + std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0); + + { + // We use half the chunks - should result in no use of blocks due to 80% limit + std::vector<uint32_t> UnusedChunkIndexes80Percent; + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<size_t> ReusedBlocks80Percent = FindReuseBlocks(*LogOutput, + 80, + false, + ReuseBlocksStats, + BlockDescriptions, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes80Percent); + + CHECK_EQ(0u, ReusedBlocks80Percent.size()); + CHECK_EQ(ManyChunkHashes.size(), UnusedChunkIndexes80Percent.size()); + } + { + // We use half the chunks - should result in use of both blocks due to 40% limit + std::vector<uint32_t> UnusedChunkIndexes40Percent; + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<size_t> ReusedBlocks40Percent = FindReuseBlocks(*LogOutput, + 40, + false, + ReuseBlocksStats, + BlockDescriptions, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes40Percent); + + CHECK_EQ(2u, ReusedBlocks40Percent.size()); + CHECK_EQ(0u, UnusedChunkIndexes40Percent.size()); + } + } + + { + std::vector<IoHash> ManyChunkHashes; + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[0].ChunkRawHashes.begin(), + BlockDescriptions[0].ChunkRawHashes.end() - BlockDescriptions[0].ChunkRawHashes.size() / 2); + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[1].ChunkRawHashes.begin() + 1, + BlockDescriptions[1].ChunkRawHashes.end()); + std::vector<uint32_t> ManyChunkIndexes; + ManyChunkIndexes.resize(ManyChunkHashes.size()); + std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0); + + { + // We use half the chunks for first block - should result in use of one blocks due to 80% limit + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<uint32_t> UnusedChunkIndexes80Percent; + std::vector<size_t> ReusedBlocks80Percent = FindReuseBlocks(*LogOutput, + 80, + false, + ReuseBlocksStats, + BlockDescriptions, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes80Percent); + + CHECK_EQ(1u, ReusedBlocks80Percent.size()); + CHECK_EQ(BlockDescriptions[0].ChunkRawHashes.size() - BlockDescriptions[0].ChunkRawHashes.size() / 2, + UnusedChunkIndexes80Percent.size()); + } + { + // We use half the chunks - should result in use of both blocks due to 40% limit + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<uint32_t> UnusedChunkIndexes40Percent; + std::vector<size_t> ReusedBlocks40Percent = FindReuseBlocks(*LogOutput, + 40, + false, + ReuseBlocksStats, + BlockDescriptions, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes40Percent); + + CHECK_EQ(2u, ReusedBlocks40Percent.size()); + CHECK_EQ(0u, UnusedChunkIndexes40Percent.size()); + } + } + + { + // Test simulate ThinkChunkBlockDescriptions + + for (ChunkBlockDescription& BlockDescription : BlockDescriptions) + { + BlockDescription.HeaderSize = 0; + BlockDescription.ChunkRawLengths = std::vector<uint32_t>(BlockDescription.ChunkRawHashes.size(), 1); + BlockDescription.ChunkCompressedLengths = std::vector<uint32_t>(BlockDescription.ChunkRawHashes.size(), 1); + } + + std::vector<IoHash> ManyChunkHashes; + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[0].ChunkRawHashes.begin(), + BlockDescriptions[0].ChunkRawHashes.end() - BlockDescriptions[0].ChunkRawHashes.size() / 2); + ManyChunkHashes.insert(ManyChunkHashes.end(), + BlockDescriptions[1].ChunkRawHashes.begin() + 1, + BlockDescriptions[1].ChunkRawHashes.end()); + std::vector<uint32_t> ManyChunkIndexes; + ManyChunkIndexes.resize(ManyChunkHashes.size()); + std::iota(ManyChunkIndexes.begin(), ManyChunkIndexes.end(), 0); + + { + // We use half the chunks for first block - should result in use of one blocks due to 80% limit + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<uint32_t> UnusedChunkIndexes80Percent; + std::vector<size_t> ReusedBlocks80Percent = FindReuseBlocks(*LogOutput, + 80, + false, + ReuseBlocksStats, + BlockDescriptions, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes80Percent); + + CHECK_EQ(1u, ReusedBlocks80Percent.size()); + CHECK_EQ(BlockDescriptions[0].ChunkRawHashes.size() - BlockDescriptions[0].ChunkRawHashes.size() / 2, + UnusedChunkIndexes80Percent.size()); + } + { + // We use half the chunks - should result in use of both blocks due to 40% limit + ReuseBlocksStatistics ReuseBlocksStats; + std::vector<uint32_t> UnusedChunkIndexes40Percent; + std::vector<size_t> ReusedBlocks40Percent = FindReuseBlocks(*LogOutput, + 40, + false, + ReuseBlocksStats, + BlockDescriptions, + ManyChunkHashes, + ManyChunkIndexes, + UnusedChunkIndexes40Percent); + + CHECK_EQ(2u, ReusedBlocks40Percent.size()); + CHECK_EQ(0u, UnusedChunkIndexes40Percent.size()); + } + } +} + void chunkblock_forcelink() { |