diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-24 13:53:54 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-24 13:53:54 +0200 |
| commit | 1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa (patch) | |
| tree | cfafeecb830a44a6d0870a217edabcc62d37669c /src/zenstore/blockstore.cpp | |
| parent | remove obsolete code (diff) | |
| download | zen-1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa.tar.xz zen-1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa.zip | |
iterate cas chunks (#59)
- Improvement: Reworked GetChunkInfos in oplog store to reduce disk thrashing and improve performance
Diffstat (limited to 'src/zenstore/blockstore.cpp')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 205 |
1 files changed, 122 insertions, 83 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 8ef9e842f..ee58e6c5d 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -8,6 +8,7 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <algorithm> #include <unordered_map> @@ -22,7 +23,6 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> # include <zencore/testutils.h> -# include <zencore/workthreadpool.h> # include <random> #endif @@ -156,7 +156,7 @@ BlockStoreFile::IsOpen() const return !!m_IoBuffer; } -constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; +constexpr uint64_t IterateSmallChunkWindowSize = 2 * 1024 * 1024; BlockStore::BlockStore() { @@ -1039,131 +1039,167 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, void BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& SmallSizeCallback, - const IterateChunksLargeSizeCallback& LargeSizeCallback) + const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback, + const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback, + WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("BlockStore::IterateChunks"); + Stopwatch Timer; + auto _ = MakeGuard([&]() { + ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); - std::vector<size_t> LocationIndexes; - LocationIndexes.reserve(ChunkLocations.size()); + std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks; + std::vector<std::pair<uint32_t, std::vector<size_t>>> BlocksChunks; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { - LocationIndexes.push_back(ChunkIndex); - } - std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { - const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; - const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; - if (LocationA.BlockIndex < LocationB.BlockIndex) + const BlockStoreLocation& Location = ChunkLocations[ChunkIndex]; + if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end()) { - return true; + BlocksChunks[It->second].second.push_back(ChunkIndex); } - else if (LocationA.BlockIndex > LocationB.BlockIndex) + else { - return false; + BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size())); + BlocksChunks.push_back(std::make_pair(Location.BlockIndex, std::vector<size_t>({ChunkIndex}))); } - return LocationA.Offset < LocationB.Offset; - }); - - IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); - - RwLock::SharedLockScope _(m_InsertLock); + } - auto GetNextRange = [&](size_t StartIndexOffset) { + auto GetNextRange = [&ChunkLocations](const std::vector<size_t>& ChunkIndexes, size_t StartIndexOffset) -> size_t { size_t ChunkCount = 0; - size_t StartIndex = LocationIndexes[StartIndexOffset]; + size_t StartIndex = ChunkIndexes[StartIndexOffset]; const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; uint64_t StartOffset = StartLocation.Offset; - while (StartIndexOffset + ChunkCount < LocationIndexes.size()) + uint64_t LastEnd = StartOffset + StartLocation.Size; + while (StartIndexOffset + ChunkCount < ChunkIndexes.size()) { - size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount]; + size_t NextIndex = ChunkIndexes[StartIndexOffset + ChunkCount]; const BlockStoreLocation& Location = ChunkLocations[NextIndex]; - if (Location.BlockIndex != StartLocation.BlockIndex) + ZEN_ASSERT(Location.BlockIndex == StartLocation.BlockIndex); + if (Location.Offset >= (LastEnd + (4u * 1024u))) { break; } - if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize) + if ((Location.Offset + Location.Size) - StartOffset > IterateSmallChunkWindowSize) { break; } + LastEnd = Location.Offset + Location.Size; ++ChunkCount; } return ChunkCount; }; - size_t LocationIndexOffset = 0; - while (LocationIndexOffset < LocationIndexes.size()) - { - size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; - const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; - - const uint32_t BlockIndex = FirstLocation.BlockIndex; - auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); + auto ScanBlock = [&](const uint32_t BlockIndex, std::vector<size_t>& ChunkIndexes) { + std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { + const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; + const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; + if (LocationA.BlockIndex < LocationB.BlockIndex) + { + return true; + } + else if (LocationA.BlockIndex > LocationB.BlockIndex) + { + return false; + } + return LocationA.Offset < LocationB.Offset; + }); + RwLock::SharedLockScope InsertLock(m_InsertLock); + auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); if (FindBlockIt == m_ChunkBlocks.end()) { + InsertLock.ReleaseNow(); + ZEN_LOG_SCOPE("block #{} not available", BlockIndex); - while (ChunkLocations[ChunkIndex].BlockIndex == BlockIndex) + for (size_t ChunkIndex : ChunkIndexes) { - SmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - if (LocationIndexOffset == LocationIndexes.size()) - { - break; - } - ChunkIndex = LocationIndexes[LocationIndexOffset]; + AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); } - continue; } - const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; - ZEN_ASSERT(BlockFile); - - const size_t BlockSize = BlockFile->FileSize(); - const size_t RangeCount = GetNextRange(LocationIndexOffset); - if (RangeCount > 0) + else { - size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1]; - const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; - uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; - BlockFile->Read(BufferBase, Size, FirstLocation.Offset); - for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; + ZEN_ASSERT(BlockFile); + + IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + size_t LocationIndexOffset = 0; + while (LocationIndexOffset < ChunkIndexes.size()) { - size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex]; - const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; - if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) + size_t ChunkIndex = ChunkIndexes[LocationIndexOffset]; + const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; + + const size_t BlockSize = BlockFile->FileSize(); + const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset); + if (RangeCount > 0) + { + size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; + const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; + uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + BlockFile->Read(BufferBase, Size, FirstLocation.Offset); + for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + { + size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex]; + const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; + if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) + { + ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", + ChunkLocation.Offset, + ChunkLocation.Size, + BlockIndex, + BlockSize); + + AsyncSmallSizeCallback(NextChunkIndex, nullptr, 0); + continue; + } + void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; + AsyncSmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + } + LocationIndexOffset += RangeCount; + continue; + } + if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) { ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - ChunkLocation.Offset, - ChunkLocation.Size, + FirstLocation.Offset, + FirstLocation.Size, BlockIndex, BlockSize); - SmallSizeCallback(NextChunkIndex, nullptr, 0); + AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); + LocationIndexOffset++; continue; } - void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; - SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + AsyncLargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); + LocationIndexOffset++; } - LocationIndexOffset += RangeCount; - continue; } - if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) - { - ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - FirstLocation.Offset, - FirstLocation.Size, - BlockIndex, - BlockSize); + }; - SmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - continue; + Latch WorkLatch(1); + for (auto& BlockChunks : BlocksChunks) + { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&ChunkLocations, &BlockChunks, &ScanBlock, &WorkLatch]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + ScanBlock(BlockChunks.first, BlockChunks.second); + }); + } + else + { + ScanBlock(BlockChunks.first, BlockChunks.second); } - LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); - LocationIndexOffset++; } + WorkLatch.CountDown(); + WorkLatch.Wait(); } void @@ -1814,7 +1850,7 @@ TEST_CASE("blockstore.iterate.chunks") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024); + Store.Initialize(RootDirectory / "store", IterateSmallChunkWindowSize * 2, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); @@ -1825,15 +1861,17 @@ TEST_CASE("blockstore.iterate.chunks") BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(/*ForceNewBlock*/ false); - std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); + std::string VeryLargeChunk(IterateSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, - .Offset = ScrubSmallChunkWindowSize, - .Size = ScrubSmallChunkWindowSize * 2}; + .Offset = IterateSmallChunkWindowSize, + .Size = IterateSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; + WorkerThreadPool WorkerPool(4); + Store.IterateChunks( {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { @@ -1899,7 +1937,8 @@ TEST_CASE("blockstore.iterate.chunks") CHECK(false); break; } - }); + }, + &WorkerPool); } TEST_CASE("blockstore.reclaim.space") |