diff options
Diffstat (limited to 'src/zenremotestore/chunking/chunkblock.cpp')
| -rw-r--r-- | src/zenremotestore/chunking/chunkblock.cpp | 540 |
1 files changed, 535 insertions, 5 deletions
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp index c4d8653f4..06cedae3f 100644 --- a/src/zenremotestore/chunking/chunkblock.cpp +++ b/src/zenremotestore/chunking/chunkblock.cpp @@ -10,18 +10,17 @@ #include <zenremotestore/operationlogoutput.h> +#include <numeric> #include <vector> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> +#include <tsl/robin_set.h> ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include <zencore/testing.h> # include <zencore/testutils.h> - -# include <unordered_map> -# include <numeric> #endif // ZEN_WITH_TESTS namespace zen { @@ -455,6 +454,537 @@ FindReuseBlocks(OperationLogOutput& Output, return FilteredReuseBlockIndexes; } +ChunkBlockAnalyser::ChunkBlockAnalyser(OperationLogOutput& LogOutput, + std::span<const ChunkBlockDescription> BlockDescriptions, + const Options& Options) +: m_LogOutput(LogOutput) +, m_BlockDescriptions(BlockDescriptions) +, m_Options(Options) +{ +} + +std::vector<ChunkBlockAnalyser::NeededBlock> +ChunkBlockAnalyser::GetNeeded(const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToChunkIndex, + std::function<bool(uint32_t ChunkIndex)>&& NeedsBlockChunk) +{ + ZEN_TRACE_CPU("ChunkBlockAnalyser::GetNeeded"); + + std::vector<NeededBlock> Result; + + std::vector<bool> ChunkIsNeeded(ChunkHashToChunkIndex.size()); + for (uint32_t ChunkIndex = 0; ChunkIndex < ChunkHashToChunkIndex.size(); ChunkIndex++) + { + ChunkIsNeeded[ChunkIndex] = NeedsBlockChunk(ChunkIndex); + } + + std::vector<uint64_t> BlockSlack(m_BlockDescriptions.size(), 0u); + for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + uint64_t BlockUsedSize = 0; + uint64_t BlockSize = 0; + + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (ChunkIsNeeded[RemoteChunkIndex]) + { + BlockUsedSize += BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + } + } + BlockSize += BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + } + BlockSlack[BlockIndex] = BlockSize - BlockUsedSize; + } + + std::vector<uint32_t> BlockOrder(m_BlockDescriptions.size()); + std::iota(BlockOrder.begin(), BlockOrder.end(), 0); + + std::sort(BlockOrder.begin(), BlockOrder.end(), [&BlockSlack](uint32_t Lhs, uint32_t Rhs) { + return BlockSlack[Lhs] < BlockSlack[Rhs]; + }); + + std::vector<bool> ChunkIsPickedUp(ChunkHashToChunkIndex.size(), false); + + for (uint32_t BlockIndex : BlockOrder) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + std::vector<uint32_t> BlockChunkIndexNeeded; + + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (ChunkIsNeeded[RemoteChunkIndex]) + { + if (!ChunkIsPickedUp[RemoteChunkIndex]) + { + ChunkIsPickedUp[RemoteChunkIndex] = true; + BlockChunkIndexNeeded.push_back(ChunkBlockIndex); + } + } + } + else + { + ZEN_DEBUG("Chunk {} not found in block {}", ChunkHash, BlockDescription.BlockHash); + } + } + + if (!BlockChunkIndexNeeded.empty()) + { + Result.push_back(NeededBlock{.BlockIndex = BlockIndex, .ChunkIndexes = std::move(BlockChunkIndexNeeded)}); + } + } + return Result; +} + +ChunkBlockAnalyser::BlockResult +ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock> NeededBlocks, + std::span<const EPartialBlockDownloadMode> BlockPartialDownloadModes) +{ + ZEN_TRACE_CPU("ChunkBlockAnalyser::CalculatePartialBlockDownloads"); + + Stopwatch PartialAnalisysTimer; + + ChunkBlockAnalyser::BlockResult Result; + + uint64_t IdealDownloadTotalSize = 0; + uint64_t AllBlocksTotalBlocksSize = 0; + + for (const NeededBlock& NeededBlock : NeededBlocks) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; + + std::span<const uint32_t> BlockChunkIndexNeeded(NeededBlock.ChunkIndexes); + if (!NeededBlock.ChunkIndexes.empty()) + { + bool WantsToDoPartialBlockDownload = NeededBlock.ChunkIndexes.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && + (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); + + EPartialBlockDownloadMode PartialBlockDownloadMode = BlockPartialDownloadModes[NeededBlock.BlockIndex]; + + const uint32_t ChunkStartOffsetInBlock = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(ChunkStartOffsetInBlock)); + + AllBlocksTotalBlocksSize += TotalBlockSize; + + if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Off) && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + { + ZEN_TRACE_CPU("PartialBlockAnalysis"); + + uint64_t TotalWantedChunksSize = 0; + std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(NeededBlock.BlockIndex, + BlockDescription, + NeededBlock.ChunkIndexes, + PartialBlockDownloadMode, + ChunkStartOffsetInBlock, + TotalBlockSize, + TotalWantedChunksSize); + ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); + IdealDownloadTotalSize += TotalWantedChunksSize; + + if (MaybeBlockRanges.has_value()) + { + const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value(); + ZEN_ASSERT(!BlockRanges.empty()); + + uint64_t RequestedSize = + std::accumulate(BlockRanges.begin(), + BlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + + if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact) && ((RequestedSize * 100) / TotalBlockSize) >= 200) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Requesting {} chunks ({}) from block {} ({}) using full block request (extra bytes {})", + NeededBlock.ChunkIndexes.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(TotalBlockSize - TotalWantedChunksSize)); + } + Result.FullBlockIndexes.push_back(NeededBlock.BlockIndex); + } + else + { + Result.BlockRanges.insert(Result.BlockRanges.end(), BlockRanges.begin(), BlockRanges.end()); + + if (RequestedSize > TotalWantedChunksSize) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", + NeededBlock.ChunkIndexes.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + BlockRanges.size(), + NiceBytes(RequestedSize - TotalWantedChunksSize)); + } + } + } + } + else + { + Result.FullBlockIndexes.push_back(NeededBlock.BlockIndex); + } + } + else + { + Result.FullBlockIndexes.push_back(NeededBlock.BlockIndex); + IdealDownloadTotalSize += TotalBlockSize; + } + } + } + + if (!Result.BlockRanges.empty() && !m_Options.IsQuiet) + { + tsl::robin_set<uint32_t> PartialBlockIndexes; + uint64_t PartialBlocksTotalSize = std::accumulate(Result.BlockRanges.begin(), + Result.BlockRanges.end(), + uint64_t(0u), + [&](uint64_t Current, const BlockRangeDescriptor& Range) { + PartialBlockIndexes.insert(Range.BlockIndex); + return Current + Range.RangeLength; + }); + + uint64_t FullBlocksTotalSize = + std::accumulate(Result.FullBlockIndexes.begin(), + Result.FullBlockIndexes.end(), + uint64_t(0u), + [&](uint64_t Current, uint32_t BlockIndex) { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + return Current + std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + }); + + uint64_t PartialBlockRequestCount = Result.BlockRanges.size(); + uint64_t PartialBlockCount = PartialBlockIndexes.size(); + + uint64_t TotalExtraPartialBlocksRequestCount = PartialBlockRequestCount - PartialBlockCount; + uint64_t ActualPartialDownloadTotalSize = FullBlocksTotalSize + PartialBlocksTotalSize; + + uint64_t IdealSkippedSize = AllBlocksTotalBlocksSize - IdealDownloadTotalSize; + uint64_t ActualSkippedSize = AllBlocksTotalBlocksSize - ActualPartialDownloadTotalSize; + + double PercentOfIdealPartialSkippedSize = (ActualSkippedSize * 100.0) / IdealSkippedSize; + + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Analysis of partial block requests saves download of {} out of {}, {:.1f}% of possible {} using {} extra " + "requests. Completed in {}", + NiceBytes(ActualSkippedSize), + NiceBytes(AllBlocksTotalBlocksSize), + PercentOfIdealPartialSkippedSize, + NiceBytes(IdealSkippedSize), + TotalExtraPartialBlocksRequestCount, + NiceTimeSpanMs(PartialAnalisysTimer.GetElapsedTimeMs())); + } + + return Result; +} + +ChunkBlockAnalyser::BlockRangeDescriptor +ChunkBlockAnalyser::MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges) +{ + ZEN_ASSERT(Ranges.size() > 1); + const BlockRangeDescriptor& First = Ranges.front(); + const BlockRangeDescriptor& Last = Ranges.back(); + + return BlockRangeDescriptor{.BlockIndex = First.BlockIndex, + .RangeStart = First.RangeStart, + .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, + .ChunkBlockIndexStart = First.ChunkBlockIndexStart, + .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; +} + +std::optional<std::vector<ChunkBlockAnalyser::BlockRangeDescriptor>> +ChunkBlockAnalyser::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range) +{ + if (Range.RangeLength == TotalBlockSize) + { + return {}; + } + else + { + return std::vector<BlockRangeDescriptor>{Range}; + } +}; + +const ChunkBlockAnalyser::BlockRangeLimit* +ChunkBlockAnalyser::GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, + uint64_t TotalBlockSize, + std::span<const BlockRangeDescriptor> Ranges) +{ + if (Ranges.size() > 1) + { + const std::uint64_t WantedSize = + std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; + + for (const BlockRangeLimit& Limit : Limits) + { + if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) + { + return &Limit; + } + } + } + return nullptr; +}; + +std::vector<ChunkBlockAnalyser::BlockRangeDescriptor> +ChunkBlockAnalyser::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + + auto BlockRangesIt = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*BlockRangesIt++); + for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + + const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; + + const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) + { + LastRange.ChunkBlockIndexCount = + (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; + } + else + { + CollapsedBlockRanges.push_back(*BlockRangesIt); + } + } + + return CollapsedBlockRanges; +}; + +uint64_t +ChunkBlockAnalyser::CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + uint64_t AcceptableGap = (uint64_t)-1; + for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) + { + const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; + const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; + + const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); + AcceptableGap = Min(Gap, AcceptableGap); + } + AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); + return AcceptableGap; +}; + +std::optional<std::vector<ChunkBlockAnalyser::BlockRangeDescriptor>> +ChunkBlockAnalyser::CalculateBlockRanges(uint32_t BlockIndex, + const ChunkBlockDescription& BlockDescription, + std::span<const uint32_t> BlockChunkIndexNeeded, + EPartialBlockDownloadMode PartialBlockDownloadMode, + const uint64_t ChunkStartOffsetInBlock, + const uint64_t TotalBlockSize, + uint64_t& OutTotalWantedChunksSize) +{ + ZEN_TRACE_CPU("CalculateBlockRanges"); + + if (PartialBlockDownloadMode == EPartialBlockDownloadMode::Off) + { + return {}; + } + + std::vector<BlockRangeDescriptor> BlockRanges; + { + uint64_t CurrentOffset = ChunkStartOffsetInBlock; + uint32_t ChunkBlockIndex = 0; + uint32_t NeedBlockChunkIndexOffset = 0; + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + { + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {.BlockIndex = BlockIndex}; + } + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength == 0) + { + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); + } + } + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + } + } + ZEN_ASSERT(!BlockRanges.empty()); + + OutTotalWantedChunksSize = + std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; + + if (BlockRanges.size() == 1) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Range request of {} ({:.2f}%) using single range from block {} ({}) as is", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + } + return BlockRanges; + } + + if (PartialBlockDownloadMode == EPartialBlockDownloadMode::Exact) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + } + return BlockRanges; + } + + if (PartialBlockDownloadMode == EPartialBlockDownloadMode::SingleRange) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (m_Options.IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " + "{:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + if (RangeWantedPercent > FullBlockRangePercentLimit) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (m_Options.IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " + "({:.2f}%) wasting {:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + FullBlockRangePercentLimit, + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); + while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) + { + CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); + } + + const std::uint64_t WantedCollapsedSize = + std::accumulate(CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + + const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; + + if (m_Options.IsVerbose) + { + const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; + + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " + "({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(WantedCollapsedSize), + CollapsedRangeRequestedPercent, + CollapsedBlockRanges.size(), + WastedPercent, + NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); + } + return CollapsedBlockRanges; +} + #if ZEN_WITH_TESTS namespace testutils { @@ -476,7 +1006,7 @@ namespace testutils { } // namespace testutils -TEST_CASE("project.store.block") +TEST_CASE("chunkblock.block") { using namespace std::literals; using namespace testutils; @@ -504,7 +1034,7 @@ TEST_CASE("project.store.block") HeaderSize)); } -TEST_CASE("project.store.reuseblocks") +TEST_CASE("chunkblock.reuseblocks") { using namespace std::literals; using namespace testutils; |