diff options
| author | Dan Engelbrecht <[email protected]> | 2024-05-17 10:55:56 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-05-17 10:55:56 +0200 |
| commit | baf9891624b758b686e6b1c284013bea08794d69 (patch) | |
| tree | 732f00270fa5264565a8535cea911d034fc12ba9 /src/zenstore/blockstore.cpp | |
| parent | safer partial requests (#82) (diff) | |
| download | zen-baf9891624b758b686e6b1c284013bea08794d69.tar.xz zen-baf9891624b758b686e6b1c284013bea08794d69.zip | |
refactor BlockStore IterateChunks (#77)
Improvement: Refactored IterateChunks to allow reuse in diskcachelayer and hide public GetBlockFile() function in BlockStore
Diffstat (limited to 'src/zenstore/blockstore.cpp')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 385 |
1 files changed, 196 insertions, 189 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index ee58e6c5d..aef7b348b 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -8,7 +8,6 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zencore/workthreadpool.h> #include <algorithm> #include <unordered_map> @@ -23,6 +22,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zencore/workthreadpool.h> # include <random> #endif @@ -1037,39 +1037,24 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, } } -void -BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback, - const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback, - WorkerThreadPool* OptionalWorkerPool) +bool +BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, + std::span<const size_t> InChunkIndexes, + const IterateChunksSmallSizeCallback& SmallSizeCallback, + const IterateChunksLargeSizeCallback& LargeSizeCallback) { - ZEN_TRACE_CPU("BlockStore::IterateChunks"); - - Stopwatch Timer; - auto _ = MakeGuard([&]() { - ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); - - std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks; - std::vector<std::pair<uint32_t, std::vector<size_t>>> BlocksChunks; - - for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) + if (InChunkIndexes.empty()) { - const BlockStoreLocation& Location = ChunkLocations[ChunkIndex]; - if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end()) - { - BlocksChunks[It->second].second.push_back(ChunkIndex); - } - else - { - BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size())); - BlocksChunks.push_back(std::make_pair(Location.BlockIndex, std::vector<size_t>({ChunkIndex}))); - } + return true; } - auto GetNextRange = [&ChunkLocations](const std::vector<size_t>& ChunkIndexes, size_t StartIndexOffset) -> size_t { + uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex; + std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end()); + std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { + return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset; + }); + + auto GetNextRange = [&ChunkLocations](std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t { size_t ChunkCount = 0; size_t StartIndex = ChunkIndexes[StartIndexOffset]; const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; @@ -1094,112 +1079,126 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, return ChunkCount; }; - auto ScanBlock = [&](const uint32_t BlockIndex, std::vector<size_t>& ChunkIndexes) { - std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { - const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; - const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; - if (LocationA.BlockIndex < LocationB.BlockIndex) - { - return true; - } - else if (LocationA.BlockIndex > LocationB.BlockIndex) - { - return false; - } - return LocationA.Offset < LocationB.Offset; - }); - RwLock::SharedLockScope InsertLock(m_InsertLock); - auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); - if (FindBlockIt == m_ChunkBlocks.end()) - { - InsertLock.ReleaseNow(); + RwLock::SharedLockScope InsertLock(m_InsertLock); + auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); + if (FindBlockIt == m_ChunkBlocks.end()) + { + InsertLock.ReleaseNow(); - ZEN_LOG_SCOPE("block #{} not available", BlockIndex); + ZEN_LOG_SCOPE("block #{} not available", BlockIndex); - for (size_t ChunkIndex : ChunkIndexes) + for (size_t ChunkIndex : ChunkIndexes) + { + if (!SmallSizeCallback(ChunkIndex, nullptr, 0)) { - AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); + return false; } } - else - { - const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; - ZEN_ASSERT(BlockFile); + } + else + { + const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; + ZEN_ASSERT(BlockFile); - IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); + IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); - size_t LocationIndexOffset = 0; - while (LocationIndexOffset < ChunkIndexes.size()) - { - size_t ChunkIndex = ChunkIndexes[LocationIndexOffset]; - const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; + size_t LocationIndexOffset = 0; + while (LocationIndexOffset < ChunkIndexes.size()) + { + size_t ChunkIndex = ChunkIndexes[LocationIndexOffset]; + const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; - const size_t BlockSize = BlockFile->FileSize(); - const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset); - if (RangeCount > 0) + const size_t BlockSize = BlockFile->FileSize(); + const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset); + if (RangeCount > 0) + { + size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; + const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; + uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + BlockFile->Read(BufferBase, Size, FirstLocation.Offset); + for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) { - size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; - const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; - uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; - BlockFile->Read(BufferBase, Size, FirstLocation.Offset); - for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex]; + const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; + if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) { - size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex]; - const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; - if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) - { - ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - ChunkLocation.Offset, - ChunkLocation.Size, - BlockIndex, - BlockSize); - - AsyncSmallSizeCallback(NextChunkIndex, nullptr, 0); - continue; - } - void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; - AsyncSmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", + ChunkLocation.Offset, + ChunkLocation.Size, + BlockIndex, + BlockSize); + + SmallSizeCallback(NextChunkIndex, nullptr, 0); + continue; } - LocationIndexOffset += RangeCount; - continue; + void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; + SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); } - if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) - { - ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - FirstLocation.Offset, - FirstLocation.Size, - BlockIndex, - BlockSize); - - AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - continue; - } - AsyncLargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); + LocationIndexOffset += RangeCount; + continue; + } + if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) + { + ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", + FirstLocation.Offset, + FirstLocation.Size, + BlockIndex, + BlockSize); + + SmallSizeCallback(ChunkIndex, nullptr, 0); LocationIndexOffset++; + continue; + } + if (!LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size)) + { + return false; } + LocationIndexOffset++; } - }; + } + return true; +} - Latch WorkLatch(1); - for (auto& BlockChunks : BlocksChunks) +bool +BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback) +{ + ZEN_TRACE_CPU("BlockStore::IterateChunks"); + + Stopwatch Timer; + auto _ = MakeGuard([&]() { + ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); + + std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks; + std::vector<std::vector<size_t>> BlocksChunks; + + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { - if (OptionalWorkerPool) + const BlockStoreLocation& Location = ChunkLocations[ChunkIndex]; + if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end()) { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&ChunkLocations, &BlockChunks, &ScanBlock, &WorkLatch]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - ScanBlock(BlockChunks.first, BlockChunks.second); - }); + BlocksChunks[It->second].push_back(ChunkIndex); } else { - ScanBlock(BlockChunks.first, BlockChunks.second); + BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size())); + BlocksChunks.push_back(std::vector<size_t>({ChunkIndex})); } } - WorkLatch.CountDown(); - WorkLatch.Wait(); + + for (auto& BlockChunks : BlocksChunks) + { + ZEN_ASSERT(!BlockChunks.empty()); + uint32_t BlockIndex = ChunkLocations[BlockChunks[0]].BlockIndex; + if (!Callback(BlockIndex, BlockChunks)) + { + return false; + } + } + return true; } void @@ -1513,17 +1512,6 @@ BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint return Path.ToPath(); } -Ref<BlockStoreFile> -BlockStore::GetBlockFile(uint32_t BlockIndex) -{ - RwLock::SharedLockScope _(m_InsertLock); - if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end()) - { - return It->second; - } - return {}; -} - #if ZEN_WITH_TESTS TEST_CASE("blockstore.blockstoredisklocation") @@ -1872,73 +1860,92 @@ TEST_CASE("blockstore.iterate.chunks") WorkerThreadPool WorkerPool(4); - Store.IterateChunks( - {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - switch (ChunkIndex) - { - case 0: - CHECK(Data); - CHECK(Size == FirstChunkData.size()); - CHECK(std::string((const char*)Data, Size) == FirstChunkData); - break; - case 1: - CHECK(Data); - CHECK(Size == SecondChunkData.size()); - CHECK(std::string((const char*)Data, Size) == SecondChunkData); - break; - case 2: - CHECK(false); - break; - case 3: - CHECK(!Data); - break; - case 4: - CHECK(!Data); - break; - case 5: - CHECK(!Data); - break; - default: - CHECK(false); - break; - } - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - switch (ChunkIndex) - { - case 0: - case 1: - CHECK(false); - break; - case 2: + std::vector<BlockStoreLocation> Locations{FirstChunkLocation, + SecondChunkLocation, + VeryLargeChunkLocation, + BadLocationZeroSize, + BadLocationOutOfRange, + BadBlockIndex}; + Latch WorkLatch(1); + Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + bool Continue = Store.IterateBlock( + Locations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { + switch (ChunkIndex) { - CHECK(Size == VeryLargeChunk.size()); - char* Buffer = new char[Size]; - size_t HashOffset = 0; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { - memcpy(&Buffer[HashOffset], Data, Size); - HashOffset += Size; - }); - CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); - delete[] Buffer; + case 0: + CHECK(Data); + CHECK(Size == FirstChunkData.size()); + CHECK(std::string((const char*)Data, Size) == FirstChunkData); + break; + case 1: + CHECK(Data); + CHECK(Size == SecondChunkData.size()); + CHECK(std::string((const char*)Data, Size) == SecondChunkData); + break; + case 2: + CHECK(false); + break; + case 3: + CHECK(!Data); + break; + case 4: + CHECK(!Data); + break; + case 5: + CHECK(!Data); + break; + default: + CHECK(false); + break; } - break; - case 3: - CHECK(false); - break; - case 4: - CHECK(false); - break; - case 5: - CHECK(false); - break; - default: - CHECK(false); - break; - } - }, - &WorkerPool); + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + case 1: + CHECK(false); + break; + case 2: + { + CHECK(Size == VeryLargeChunk.size()); + char* Buffer = new char[Size]; + size_t HashOffset = 0; + File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { + memcpy(&Buffer[HashOffset], Data, Size); + HashOffset += Size; + }); + CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); + delete[] Buffer; + } + break; + case 3: + CHECK(false); + break; + case 4: + CHECK(false); + break; + case 5: + CHECK(false); + break; + default: + CHECK(false); + break; + } + return true; + }); + CHECK(Continue); + }); + return true; + }); + WorkLatch.CountDown(); + WorkLatch.Wait(); } TEST_CASE("blockstore.reclaim.space") |