aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-05-17 10:55:56 +0200
committerGitHub Enterprise <[email protected]>2024-05-17 10:55:56 +0200
commitbaf9891624b758b686e6b1c284013bea08794d69 (patch)
tree732f00270fa5264565a8535cea911d034fc12ba9 /src/zenstore/cache/cachedisklayer.cpp
parentsafer partial requests (#82) (diff)
downloadzen-baf9891624b758b686e6b1c284013bea08794d69.tar.xz
zen-baf9891624b758b686e6b1c284013bea08794d69.zip
refactor BlockStore IterateChunks (#77)
Improvement: Refactored IterateChunks to allow reuse in diskcachelayer and hide public GetBlockFile() function in BlockStore
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp106
1 files changed, 46 insertions, 60 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 6cb749b5a..2e307118b 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1754,7 +1754,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
}
}
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
ChunkCount.fetch_add(1);
VerifiedChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
@@ -1762,18 +1762,18 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
// ChunkLocation out of range of stored blocks
ReportBadKey(Hash);
- return;
+ return true;
}
if (!Size)
{
ReportBadKey(Hash);
- return;
+ return true;
}
IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
if (!Buffer)
{
ReportBadKey(Hash);
- return;
+ return true;
}
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
@@ -1781,11 +1781,12 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!ValidateIoBuffer(ContentType, Buffer))
{
ReportBadKey(Hash);
- return;
+ return true;
}
+ return true;
};
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
Ctx.ThrowIfDeadlineExpired();
ChunkCount.fetch_add(1);
@@ -1795,7 +1796,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!Buffer)
{
ReportBadKey(Hash);
- return;
+ return true;
}
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
@@ -1803,11 +1804,14 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!ValidateIoBuffer(ContentType, Buffer))
{
ReportBadKey(Hash);
- return;
+ return true;
}
+ return true;
};
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr);
+ m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
+ return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
+ });
}
catch (ScrubDeadlineExpiredException&)
{
@@ -3195,16 +3199,13 @@ public:
std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys;
{
- std::vector<IoHash> InlineKeys;
- std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex;
- struct InlineEntry
- {
- uint32_t InlineKeyIndex;
- uint32_t Offset;
- uint32_t Size;
- };
- std::vector<std::vector<InlineEntry>> EntriesPerBlock;
+ std::vector<IoHash> InlineKeys;
+ std::vector<BlockStoreLocation> InlineLocations;
+ std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes;
+
{
+ std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes;
+
RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
for (const auto& Entry : m_CacheBucket.m_Index)
{
@@ -3231,56 +3232,43 @@ public:
}
BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
- InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()),
- .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset),
- .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)};
+ size_t ChunkIndex = InlineLocations.size();
+ InlineLocations.push_back(ChunkLocation);
InlineKeys.push_back(Key);
-
- if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex);
- It != BlockIndexToEntriesPerBlockIndex.end())
+ if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end())
{
- EntriesPerBlock[It->second].emplace_back(UpdateEntry);
+ InlineBlockChunkIndexes[It->second].push_back(ChunkIndex);
}
else
{
- BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size());
- EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry});
+ BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size());
+ InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex});
}
}
}
- for (auto It : BlockIndexToEntriesPerBlockIndex)
+ for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes)
{
- uint32_t BlockIndex = It.first;
-
- Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex);
- if (BlockFile)
- {
- size_t EntriesPerBlockIndex = It.second;
- std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex];
-
- std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool {
- return Lhs.Offset < Rhs.Offset;
+ ZEN_ASSERT(!ChunkIndexes.empty());
+
+ bool Continue = m_CacheBucket.m_BlockStore.IterateBlock(
+ InlineLocations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex, Size);
+ GetAttachments(Data);
+ return !Ctx.IsCancelledFlag.load();
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex);
+ GetAttachments(File.GetChunk(Offset, Size).GetData());
+ return !Ctx.IsCancelledFlag.load();
});
- uint64_t BlockFileSize = BlockFile->FileSize();
- BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768);
- for (const InlineEntry& InlineEntry : InlineEntries)
- {
- if ((InlineEntry.Offset + InlineEntry.Size) <= BlockFileSize)
- {
- MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset);
- if (ChunkView.GetSize() == InlineEntry.Size)
- {
- GetAttachments(ChunkView.GetData());
- }
- else
- {
- IoBuffer Buffer = BlockFile->GetChunk(InlineEntry.Offset, InlineEntry.Size);
- GetAttachments(Buffer.GetData());
- }
- }
- }
+ if (!Continue && Ctx.IsCancelledFlag.load())
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
+ return;
}
}
}
@@ -3293,12 +3281,10 @@ public:
}
IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first);
- if (!Buffer)
+ if (Buffer)
{
- continue;
+ GetAttachments(Buffer.GetData());
}
-
- GetAttachments(Buffer.GetData());
}
}
}