aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/blockstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-05-17 10:55:56 +0200
committerGitHub Enterprise <[email protected]>2024-05-17 10:55:56 +0200
commitbaf9891624b758b686e6b1c284013bea08794d69 (patch)
tree732f00270fa5264565a8535cea911d034fc12ba9 /src/zenstore/blockstore.cpp
parentsafer partial requests (#82) (diff)
downloadzen-baf9891624b758b686e6b1c284013bea08794d69.tar.xz
zen-baf9891624b758b686e6b1c284013bea08794d69.zip
refactor BlockStore IterateChunks (#77)
Improvement: Refactored IterateChunks to allow reuse in diskcachelayer and hide public GetBlockFile() function in BlockStore
Diffstat (limited to 'src/zenstore/blockstore.cpp')
-rw-r--r--src/zenstore/blockstore.cpp385
1 files changed, 196 insertions, 189 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index ee58e6c5d..aef7b348b 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -8,7 +8,6 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
#include <algorithm>
#include <unordered_map>
@@ -23,6 +22,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
# include <random>
#endif
@@ -1037,39 +1037,24 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
}
}
-void
-BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback,
- const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback,
- WorkerThreadPool* OptionalWorkerPool)
+bool
+BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
+ std::span<const size_t> InChunkIndexes,
+ const IterateChunksSmallSizeCallback& SmallSizeCallback,
+ const IterateChunksLargeSizeCallback& LargeSizeCallback)
{
- ZEN_TRACE_CPU("BlockStore::IterateChunks");
-
- Stopwatch Timer;
- auto _ = MakeGuard([&]() {
- ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);
-
- std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks;
- std::vector<std::pair<uint32_t, std::vector<size_t>>> BlocksChunks;
-
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
+ if (InChunkIndexes.empty())
{
- const BlockStoreLocation& Location = ChunkLocations[ChunkIndex];
- if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end())
- {
- BlocksChunks[It->second].second.push_back(ChunkIndex);
- }
- else
- {
- BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size()));
- BlocksChunks.push_back(std::make_pair(Location.BlockIndex, std::vector<size_t>({ChunkIndex})));
- }
+ return true;
}
- auto GetNextRange = [&ChunkLocations](const std::vector<size_t>& ChunkIndexes, size_t StartIndexOffset) -> size_t {
+ uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex;
+ std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end());
+ std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
+ return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset;
+ });
+
+ auto GetNextRange = [&ChunkLocations](std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t {
size_t ChunkCount = 0;
size_t StartIndex = ChunkIndexes[StartIndexOffset];
const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
@@ -1094,112 +1079,126 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
return ChunkCount;
};
- auto ScanBlock = [&](const uint32_t BlockIndex, std::vector<size_t>& ChunkIndexes) {
- std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
- const BlockStoreLocation& LocationA = ChunkLocations[IndexA];
- const BlockStoreLocation& LocationB = ChunkLocations[IndexB];
- if (LocationA.BlockIndex < LocationB.BlockIndex)
- {
- return true;
- }
- else if (LocationA.BlockIndex > LocationB.BlockIndex)
- {
- return false;
- }
- return LocationA.Offset < LocationB.Offset;
- });
- RwLock::SharedLockScope InsertLock(m_InsertLock);
- auto FindBlockIt = m_ChunkBlocks.find(BlockIndex);
- if (FindBlockIt == m_ChunkBlocks.end())
- {
- InsertLock.ReleaseNow();
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ auto FindBlockIt = m_ChunkBlocks.find(BlockIndex);
+ if (FindBlockIt == m_ChunkBlocks.end())
+ {
+ InsertLock.ReleaseNow();
- ZEN_LOG_SCOPE("block #{} not available", BlockIndex);
+ ZEN_LOG_SCOPE("block #{} not available", BlockIndex);
- for (size_t ChunkIndex : ChunkIndexes)
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ if (!SmallSizeCallback(ChunkIndex, nullptr, 0))
{
- AsyncSmallSizeCallback(ChunkIndex, nullptr, 0);
+ return false;
}
}
- else
- {
- const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
- ZEN_ASSERT(BlockFile);
+ }
+ else
+ {
+ const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
+ ZEN_ASSERT(BlockFile);
- IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
- void* BufferBase = ReadBuffer.MutableData();
+ IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
- size_t LocationIndexOffset = 0;
- while (LocationIndexOffset < ChunkIndexes.size())
- {
- size_t ChunkIndex = ChunkIndexes[LocationIndexOffset];
- const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
+ size_t LocationIndexOffset = 0;
+ while (LocationIndexOffset < ChunkIndexes.size())
+ {
+ size_t ChunkIndex = ChunkIndexes[LocationIndexOffset];
+ const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
- const size_t BlockSize = BlockFile->FileSize();
- const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset);
- if (RangeCount > 0)
+ const size_t BlockSize = BlockFile->FileSize();
+ const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset);
+ if (RangeCount > 0)
+ {
+ size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
+ const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
+ uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
+ BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
+ for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
{
- size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
- const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
- uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
- BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
- for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
+ size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
+ if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
{
- size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
- const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
- if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
- {
- ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
- ChunkLocation.Offset,
- ChunkLocation.Size,
- BlockIndex,
- BlockSize);
-
- AsyncSmallSizeCallback(NextChunkIndex, nullptr, 0);
- continue;
- }
- void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
- AsyncSmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
+ ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
+ ChunkLocation.Offset,
+ ChunkLocation.Size,
+ BlockIndex,
+ BlockSize);
+
+ SmallSizeCallback(NextChunkIndex, nullptr, 0);
+ continue;
}
- LocationIndexOffset += RangeCount;
- continue;
+ void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
+ SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
}
- if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
- {
- ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
- FirstLocation.Offset,
- FirstLocation.Size,
- BlockIndex,
- BlockSize);
-
- AsyncSmallSizeCallback(ChunkIndex, nullptr, 0);
- LocationIndexOffset++;
- continue;
- }
- AsyncLargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size);
+ LocationIndexOffset += RangeCount;
+ continue;
+ }
+ if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
+ {
+ ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
+ FirstLocation.Offset,
+ FirstLocation.Size,
+ BlockIndex,
+ BlockSize);
+
+ SmallSizeCallback(ChunkIndex, nullptr, 0);
LocationIndexOffset++;
+ continue;
+ }
+ if (!LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size))
+ {
+ return false;
}
+ LocationIndexOffset++;
}
- };
+ }
+ return true;
+}
- Latch WorkLatch(1);
- for (auto& BlockChunks : BlocksChunks)
+bool
+BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback)
+{
+ ZEN_TRACE_CPU("BlockStore::IterateChunks");
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() {
+ ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);
+
+ std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks;
+ std::vector<std::vector<size_t>> BlocksChunks;
+
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
{
- if (OptionalWorkerPool)
+ const BlockStoreLocation& Location = ChunkLocations[ChunkIndex];
+ if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end())
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&ChunkLocations, &BlockChunks, &ScanBlock, &WorkLatch]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- ScanBlock(BlockChunks.first, BlockChunks.second);
- });
+ BlocksChunks[It->second].push_back(ChunkIndex);
}
else
{
- ScanBlock(BlockChunks.first, BlockChunks.second);
+ BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size()));
+ BlocksChunks.push_back(std::vector<size_t>({ChunkIndex}));
}
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+
+ for (auto& BlockChunks : BlocksChunks)
+ {
+ ZEN_ASSERT(!BlockChunks.empty());
+ uint32_t BlockIndex = ChunkLocations[BlockChunks[0]].BlockIndex;
+ if (!Callback(BlockIndex, BlockChunks))
+ {
+ return false;
+ }
+ }
+ return true;
}
void
@@ -1513,17 +1512,6 @@ BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint
return Path.ToPath();
}
-Ref<BlockStoreFile>
-BlockStore::GetBlockFile(uint32_t BlockIndex)
-{
- RwLock::SharedLockScope _(m_InsertLock);
- if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end())
- {
- return It->second;
- }
- return {};
-}
-
#if ZEN_WITH_TESTS
TEST_CASE("blockstore.blockstoredisklocation")
@@ -1872,73 +1860,92 @@ TEST_CASE("blockstore.iterate.chunks")
WorkerThreadPool WorkerPool(4);
- Store.IterateChunks(
- {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex},
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- switch (ChunkIndex)
- {
- case 0:
- CHECK(Data);
- CHECK(Size == FirstChunkData.size());
- CHECK(std::string((const char*)Data, Size) == FirstChunkData);
- break;
- case 1:
- CHECK(Data);
- CHECK(Size == SecondChunkData.size());
- CHECK(std::string((const char*)Data, Size) == SecondChunkData);
- break;
- case 2:
- CHECK(false);
- break;
- case 3:
- CHECK(!Data);
- break;
- case 4:
- CHECK(!Data);
- break;
- case 5:
- CHECK(!Data);
- break;
- default:
- CHECK(false);
- break;
- }
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- switch (ChunkIndex)
- {
- case 0:
- case 1:
- CHECK(false);
- break;
- case 2:
+ std::vector<BlockStoreLocation> Locations{FirstChunkLocation,
+ SecondChunkLocation,
+ VeryLargeChunkLocation,
+ BadLocationZeroSize,
+ BadLocationOutOfRange,
+ BadBlockIndex};
+ Latch WorkLatch(1);
+ Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ bool Continue = Store.IterateBlock(
+ Locations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ switch (ChunkIndex)
{
- CHECK(Size == VeryLargeChunk.size());
- char* Buffer = new char[Size];
- size_t HashOffset = 0;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
- memcpy(&Buffer[HashOffset], Data, Size);
- HashOffset += Size;
- });
- CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
- delete[] Buffer;
+ case 0:
+ CHECK(Data);
+ CHECK(Size == FirstChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == FirstChunkData);
+ break;
+ case 1:
+ CHECK(Data);
+ CHECK(Size == SecondChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == SecondChunkData);
+ break;
+ case 2:
+ CHECK(false);
+ break;
+ case 3:
+ CHECK(!Data);
+ break;
+ case 4:
+ CHECK(!Data);
+ break;
+ case 5:
+ CHECK(!Data);
+ break;
+ default:
+ CHECK(false);
+ break;
}
- break;
- case 3:
- CHECK(false);
- break;
- case 4:
- CHECK(false);
- break;
- case 5:
- CHECK(false);
- break;
- default:
- CHECK(false);
- break;
- }
- },
- &WorkerPool);
+ return true;
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ case 1:
+ CHECK(false);
+ break;
+ case 2:
+ {
+ CHECK(Size == VeryLargeChunk.size());
+ char* Buffer = new char[Size];
+ size_t HashOffset = 0;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
+ memcpy(&Buffer[HashOffset], Data, Size);
+ HashOffset += Size;
+ });
+ CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
+ delete[] Buffer;
+ }
+ break;
+ case 3:
+ CHECK(false);
+ break;
+ case 4:
+ CHECK(false);
+ break;
+ case 5:
+ CHECK(false);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ });
+ CHECK(Continue);
+ });
+ return true;
+ });
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
TEST_CASE("blockstore.reclaim.space")