aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-05-17 10:55:56 +0200
committerGitHub Enterprise <[email protected]>2024-05-17 10:55:56 +0200
commitbaf9891624b758b686e6b1c284013bea08794d69 (patch)
tree732f00270fa5264565a8535cea911d034fc12ba9 /src
parentsafer partial requests (#82) (diff)
downloadzen-baf9891624b758b686e6b1c284013bea08794d69.tar.xz
zen-baf9891624b758b686e6b1c284013bea08794d69.zip
refactor BlockStore IterateChunks (#77)
Improvement: Refactored IterateChunks to allow reuse in diskcachelayer and hide public GetBlockFile() function in BlockStore
Diffstat (limited to 'src')
-rw-r--r--src/zenstore/blockstore.cpp385
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp106
-rw-r--r--src/zenstore/compactcas.cpp73
-rw-r--r--src/zenstore/include/zenstore/blockstore.h23
4 files changed, 308 insertions, 279 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index ee58e6c5d..aef7b348b 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -8,7 +8,6 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
#include <algorithm>
#include <unordered_map>
@@ -23,6 +22,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
# include <random>
#endif
@@ -1037,39 +1037,24 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
}
}
-void
-BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback,
- const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback,
- WorkerThreadPool* OptionalWorkerPool)
+bool
+BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
+ std::span<const size_t> InChunkIndexes,
+ const IterateChunksSmallSizeCallback& SmallSizeCallback,
+ const IterateChunksLargeSizeCallback& LargeSizeCallback)
{
- ZEN_TRACE_CPU("BlockStore::IterateChunks");
-
- Stopwatch Timer;
- auto _ = MakeGuard([&]() {
- ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);
-
- std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks;
- std::vector<std::pair<uint32_t, std::vector<size_t>>> BlocksChunks;
-
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
+ if (InChunkIndexes.empty())
{
- const BlockStoreLocation& Location = ChunkLocations[ChunkIndex];
- if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end())
- {
- BlocksChunks[It->second].second.push_back(ChunkIndex);
- }
- else
- {
- BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size()));
- BlocksChunks.push_back(std::make_pair(Location.BlockIndex, std::vector<size_t>({ChunkIndex})));
- }
+ return true;
}
- auto GetNextRange = [&ChunkLocations](const std::vector<size_t>& ChunkIndexes, size_t StartIndexOffset) -> size_t {
+ uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex;
+ std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end());
+ std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
+ return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset;
+ });
+
+ auto GetNextRange = [&ChunkLocations](std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t {
size_t ChunkCount = 0;
size_t StartIndex = ChunkIndexes[StartIndexOffset];
const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
@@ -1094,112 +1079,126 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
return ChunkCount;
};
- auto ScanBlock = [&](const uint32_t BlockIndex, std::vector<size_t>& ChunkIndexes) {
- std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
- const BlockStoreLocation& LocationA = ChunkLocations[IndexA];
- const BlockStoreLocation& LocationB = ChunkLocations[IndexB];
- if (LocationA.BlockIndex < LocationB.BlockIndex)
- {
- return true;
- }
- else if (LocationA.BlockIndex > LocationB.BlockIndex)
- {
- return false;
- }
- return LocationA.Offset < LocationB.Offset;
- });
- RwLock::SharedLockScope InsertLock(m_InsertLock);
- auto FindBlockIt = m_ChunkBlocks.find(BlockIndex);
- if (FindBlockIt == m_ChunkBlocks.end())
- {
- InsertLock.ReleaseNow();
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ auto FindBlockIt = m_ChunkBlocks.find(BlockIndex);
+ if (FindBlockIt == m_ChunkBlocks.end())
+ {
+ InsertLock.ReleaseNow();
- ZEN_LOG_SCOPE("block #{} not available", BlockIndex);
+ ZEN_LOG_SCOPE("block #{} not available", BlockIndex);
- for (size_t ChunkIndex : ChunkIndexes)
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ if (!SmallSizeCallback(ChunkIndex, nullptr, 0))
{
- AsyncSmallSizeCallback(ChunkIndex, nullptr, 0);
+ return false;
}
}
- else
- {
- const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
- ZEN_ASSERT(BlockFile);
+ }
+ else
+ {
+ const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
+ ZEN_ASSERT(BlockFile);
- IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
- void* BufferBase = ReadBuffer.MutableData();
+ IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
- size_t LocationIndexOffset = 0;
- while (LocationIndexOffset < ChunkIndexes.size())
- {
- size_t ChunkIndex = ChunkIndexes[LocationIndexOffset];
- const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
+ size_t LocationIndexOffset = 0;
+ while (LocationIndexOffset < ChunkIndexes.size())
+ {
+ size_t ChunkIndex = ChunkIndexes[LocationIndexOffset];
+ const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
- const size_t BlockSize = BlockFile->FileSize();
- const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset);
- if (RangeCount > 0)
+ const size_t BlockSize = BlockFile->FileSize();
+ const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset);
+ if (RangeCount > 0)
+ {
+ size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
+ const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
+ uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
+ BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
+ for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
{
- size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
- const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
- uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
- BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
- for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
+ size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
+ if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
{
- size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
- const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
- if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
- {
- ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
- ChunkLocation.Offset,
- ChunkLocation.Size,
- BlockIndex,
- BlockSize);
-
- AsyncSmallSizeCallback(NextChunkIndex, nullptr, 0);
- continue;
- }
- void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
- AsyncSmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
+ ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
+ ChunkLocation.Offset,
+ ChunkLocation.Size,
+ BlockIndex,
+ BlockSize);
+
+ SmallSizeCallback(NextChunkIndex, nullptr, 0);
+ continue;
}
- LocationIndexOffset += RangeCount;
- continue;
+ void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
+ SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
}
- if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
- {
- ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
- FirstLocation.Offset,
- FirstLocation.Size,
- BlockIndex,
- BlockSize);
-
- AsyncSmallSizeCallback(ChunkIndex, nullptr, 0);
- LocationIndexOffset++;
- continue;
- }
- AsyncLargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size);
+ LocationIndexOffset += RangeCount;
+ continue;
+ }
+ if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
+ {
+ ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
+ FirstLocation.Offset,
+ FirstLocation.Size,
+ BlockIndex,
+ BlockSize);
+
+ SmallSizeCallback(ChunkIndex, nullptr, 0);
LocationIndexOffset++;
+ continue;
+ }
+ if (!LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size))
+ {
+ return false;
}
+ LocationIndexOffset++;
}
- };
+ }
+ return true;
+}
- Latch WorkLatch(1);
- for (auto& BlockChunks : BlocksChunks)
+bool
+BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback)
+{
+ ZEN_TRACE_CPU("BlockStore::IterateChunks");
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() {
+ ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);
+
+ std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks;
+ std::vector<std::vector<size_t>> BlocksChunks;
+
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
{
- if (OptionalWorkerPool)
+ const BlockStoreLocation& Location = ChunkLocations[ChunkIndex];
+ if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end())
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&ChunkLocations, &BlockChunks, &ScanBlock, &WorkLatch]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- ScanBlock(BlockChunks.first, BlockChunks.second);
- });
+ BlocksChunks[It->second].push_back(ChunkIndex);
}
else
{
- ScanBlock(BlockChunks.first, BlockChunks.second);
+ BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size()));
+ BlocksChunks.push_back(std::vector<size_t>({ChunkIndex}));
}
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+
+ for (auto& BlockChunks : BlocksChunks)
+ {
+ ZEN_ASSERT(!BlockChunks.empty());
+ uint32_t BlockIndex = ChunkLocations[BlockChunks[0]].BlockIndex;
+ if (!Callback(BlockIndex, BlockChunks))
+ {
+ return false;
+ }
+ }
+ return true;
}
void
@@ -1513,17 +1512,6 @@ BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint
return Path.ToPath();
}
-Ref<BlockStoreFile>
-BlockStore::GetBlockFile(uint32_t BlockIndex)
-{
- RwLock::SharedLockScope _(m_InsertLock);
- if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end())
- {
- return It->second;
- }
- return {};
-}
-
#if ZEN_WITH_TESTS
TEST_CASE("blockstore.blockstoredisklocation")
@@ -1872,73 +1860,92 @@ TEST_CASE("blockstore.iterate.chunks")
WorkerThreadPool WorkerPool(4);
- Store.IterateChunks(
- {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex},
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- switch (ChunkIndex)
- {
- case 0:
- CHECK(Data);
- CHECK(Size == FirstChunkData.size());
- CHECK(std::string((const char*)Data, Size) == FirstChunkData);
- break;
- case 1:
- CHECK(Data);
- CHECK(Size == SecondChunkData.size());
- CHECK(std::string((const char*)Data, Size) == SecondChunkData);
- break;
- case 2:
- CHECK(false);
- break;
- case 3:
- CHECK(!Data);
- break;
- case 4:
- CHECK(!Data);
- break;
- case 5:
- CHECK(!Data);
- break;
- default:
- CHECK(false);
- break;
- }
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- switch (ChunkIndex)
- {
- case 0:
- case 1:
- CHECK(false);
- break;
- case 2:
+ std::vector<BlockStoreLocation> Locations{FirstChunkLocation,
+ SecondChunkLocation,
+ VeryLargeChunkLocation,
+ BadLocationZeroSize,
+ BadLocationOutOfRange,
+ BadBlockIndex};
+ Latch WorkLatch(1);
+ Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ bool Continue = Store.IterateBlock(
+ Locations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ switch (ChunkIndex)
{
- CHECK(Size == VeryLargeChunk.size());
- char* Buffer = new char[Size];
- size_t HashOffset = 0;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
- memcpy(&Buffer[HashOffset], Data, Size);
- HashOffset += Size;
- });
- CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
- delete[] Buffer;
+ case 0:
+ CHECK(Data);
+ CHECK(Size == FirstChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == FirstChunkData);
+ break;
+ case 1:
+ CHECK(Data);
+ CHECK(Size == SecondChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == SecondChunkData);
+ break;
+ case 2:
+ CHECK(false);
+ break;
+ case 3:
+ CHECK(!Data);
+ break;
+ case 4:
+ CHECK(!Data);
+ break;
+ case 5:
+ CHECK(!Data);
+ break;
+ default:
+ CHECK(false);
+ break;
}
- break;
- case 3:
- CHECK(false);
- break;
- case 4:
- CHECK(false);
- break;
- case 5:
- CHECK(false);
- break;
- default:
- CHECK(false);
- break;
- }
- },
- &WorkerPool);
+ return true;
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ case 1:
+ CHECK(false);
+ break;
+ case 2:
+ {
+ CHECK(Size == VeryLargeChunk.size());
+ char* Buffer = new char[Size];
+ size_t HashOffset = 0;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
+ memcpy(&Buffer[HashOffset], Data, Size);
+ HashOffset += Size;
+ });
+ CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
+ delete[] Buffer;
+ }
+ break;
+ case 3:
+ CHECK(false);
+ break;
+ case 4:
+ CHECK(false);
+ break;
+ case 5:
+ CHECK(false);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ });
+ CHECK(Continue);
+ });
+ return true;
+ });
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
TEST_CASE("blockstore.reclaim.space")
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 6cb749b5a..2e307118b 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1754,7 +1754,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
}
}
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
ChunkCount.fetch_add(1);
VerifiedChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
@@ -1762,18 +1762,18 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
// ChunkLocation out of range of stored blocks
ReportBadKey(Hash);
- return;
+ return true;
}
if (!Size)
{
ReportBadKey(Hash);
- return;
+ return true;
}
IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
if (!Buffer)
{
ReportBadKey(Hash);
- return;
+ return true;
}
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
@@ -1781,11 +1781,12 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!ValidateIoBuffer(ContentType, Buffer))
{
ReportBadKey(Hash);
- return;
+ return true;
}
+ return true;
};
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
Ctx.ThrowIfDeadlineExpired();
ChunkCount.fetch_add(1);
@@ -1795,7 +1796,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!Buffer)
{
ReportBadKey(Hash);
- return;
+ return true;
}
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
@@ -1803,11 +1804,14 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!ValidateIoBuffer(ContentType, Buffer))
{
ReportBadKey(Hash);
- return;
+ return true;
}
+ return true;
};
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr);
+ m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
+ return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
+ });
}
catch (ScrubDeadlineExpiredException&)
{
@@ -3195,16 +3199,13 @@ public:
std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys;
{
- std::vector<IoHash> InlineKeys;
- std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex;
- struct InlineEntry
- {
- uint32_t InlineKeyIndex;
- uint32_t Offset;
- uint32_t Size;
- };
- std::vector<std::vector<InlineEntry>> EntriesPerBlock;
+ std::vector<IoHash> InlineKeys;
+ std::vector<BlockStoreLocation> InlineLocations;
+ std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes;
+
{
+ std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes;
+
RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
for (const auto& Entry : m_CacheBucket.m_Index)
{
@@ -3231,56 +3232,43 @@ public:
}
BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
- InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()),
- .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset),
- .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)};
+ size_t ChunkIndex = InlineLocations.size();
+ InlineLocations.push_back(ChunkLocation);
InlineKeys.push_back(Key);
-
- if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex);
- It != BlockIndexToEntriesPerBlockIndex.end())
+ if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end())
{
- EntriesPerBlock[It->second].emplace_back(UpdateEntry);
+ InlineBlockChunkIndexes[It->second].push_back(ChunkIndex);
}
else
{
- BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size());
- EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry});
+ BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size());
+ InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex});
}
}
}
- for (auto It : BlockIndexToEntriesPerBlockIndex)
+ for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes)
{
- uint32_t BlockIndex = It.first;
-
- Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex);
- if (BlockFile)
- {
- size_t EntriesPerBlockIndex = It.second;
- std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex];
-
- std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool {
- return Lhs.Offset < Rhs.Offset;
+ ZEN_ASSERT(!ChunkIndexes.empty());
+
+ bool Continue = m_CacheBucket.m_BlockStore.IterateBlock(
+ InlineLocations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex, Size);
+ GetAttachments(Data);
+ return !Ctx.IsCancelledFlag.load();
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex);
+ GetAttachments(File.GetChunk(Offset, Size).GetData());
+ return !Ctx.IsCancelledFlag.load();
});
- uint64_t BlockFileSize = BlockFile->FileSize();
- BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768);
- for (const InlineEntry& InlineEntry : InlineEntries)
- {
- if ((InlineEntry.Offset + InlineEntry.Size) <= BlockFileSize)
- {
- MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset);
- if (ChunkView.GetSize() == InlineEntry.Size)
- {
- GetAttachments(ChunkView.GetData());
- }
- else
- {
- IoBuffer Buffer = BlockFile->GetChunk(InlineEntry.Offset, InlineEntry.Size);
- GetAttachments(Buffer.GetData());
- }
- }
- }
+ if (!Continue && Ctx.IsCancelledFlag.load())
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
+ return;
}
}
}
@@ -3293,12 +3281,10 @@ public:
}
IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first);
- if (!Buffer)
+ if (Buffer)
{
- continue;
+ GetAttachments(Buffer.GetData());
}
-
- GetAttachments(Buffer.GetData());
}
}
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index d6e5efdaa..dd92483b6 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -8,9 +8,11 @@
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>
#include <gsl/gsl-lite.hpp>
@@ -315,20 +317,51 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment));
}
}
- bool Continue = true;
- m_BlockStore.IterateChunks(
- FoundChunkLocations,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- if (Data != nullptr)
- {
- Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
- }
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- },
- OptionalWorkerPool);
- return Continue;
+
+ auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
+ return m_BlockStore.IterateBlock(
+ FoundChunkLocations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ if (Data == nullptr)
+ {
+ return true;
+ }
+ return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ });
+ };
+
+ Latch WorkLatch(1);
+ std::atomic_bool AsyncContinue = true;
+ bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ bool Continue = DoOneBlock(ChunkIndexes);
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ });
+ return AsyncContinue.load();
+ }
+ else
+ {
+ return DoOneBlock(ChunkIndexes);
+ }
+ });
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ return AsyncContinue.load() && Continue;
}
void
@@ -387,7 +420,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
{
// ChunkLocation out of range of stored blocks
BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
- return;
+ return true;
}
IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
@@ -398,10 +431,11 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
if (RawHash == Hash)
{
// TODO: this should also hash the (decompressed) contents
- return;
+ return true;
}
}
BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
+ return true;
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
@@ -421,13 +455,16 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
if (RawHash == Hash)
{
// TODO: this should also hash the (decompressed) contents
- return;
+ return true;
}
}
BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
+ return true;
};
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr);
+ m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
+ return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
+ });
}
catch (const ScrubDeadlineExpiredException&)
{
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 7ef2f7baa..a1e497533 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -110,7 +110,6 @@ private:
};
class BlockStoreCompactState;
-class WorkerThreadPool;
class BlockStore
{
@@ -130,9 +129,10 @@ public:
typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback;
typedef std::function<bool(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback;
typedef std::function<uint64_t()> ClaimDiskReserveCallback;
- typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
- typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
+ typedef std::function<bool(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
+ typedef std::function<bool(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
+ typedef std::function<bool(uint32_t BlockIndex, std::span<const size_t> ChunkIndexes)> IterateChunksCallback;
struct BlockUsageInfo
{
@@ -178,10 +178,12 @@ public:
const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {},
const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; });
- void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback,
- const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ bool IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback);
+
+ bool IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
+ std::span<const size_t> ChunkIndexes,
+ const IterateChunksSmallSizeCallback& SmallSizeCallback,
+ const IterateChunksLargeSizeCallback& LargeSizeCallback);
void CompactBlocks(
const BlockStoreCompactState& CompactState,
@@ -190,14 +192,11 @@ public:
const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; },
std::string_view LogPrefix = {});
- static const char* GetBlockFileExtension();
- static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);
-
inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
- Ref<BlockStoreFile> GetBlockFile(uint32_t BlockIndex);
-
private:
+ static const char* GetBlockFileExtension();
+ static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);
uint32_t GetFreeBlockIndex(uint32_t StartProbeIndex, RwLock::ExclusiveLockScope&, std::filesystem::path& OutBlockPath) const;
std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;