aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-05-17 10:55:56 +0200
committerGitHub Enterprise <[email protected]>2024-05-17 10:55:56 +0200
commitbaf9891624b758b686e6b1c284013bea08794d69 (patch)
tree732f00270fa5264565a8535cea911d034fc12ba9 /src/zenstore/compactcas.cpp
parentsafer partial requests (#82) (diff)
downloadzen-baf9891624b758b686e6b1c284013bea08794d69.tar.xz
zen-baf9891624b758b686e6b1c284013bea08794d69.zip
refactor BlockStore IterateChunks (#77)
Improvement: Refactored IterateChunks to allow reuse in diskcachelayer and hide public GetBlockFile() function in BlockStore
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp73
1 files changed, 55 insertions, 18 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index d6e5efdaa..dd92483b6 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -8,9 +8,11 @@
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>
#include <gsl/gsl-lite.hpp>
@@ -315,20 +317,51 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment));
}
}
- bool Continue = true;
- m_BlockStore.IterateChunks(
- FoundChunkLocations,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- if (Data != nullptr)
- {
- Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
- }
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- },
- OptionalWorkerPool);
- return Continue;
+
+ auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
+ return m_BlockStore.IterateBlock(
+ FoundChunkLocations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ if (Data == nullptr)
+ {
+ return true;
+ }
+ return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ });
+ };
+
+ Latch WorkLatch(1);
+ std::atomic_bool AsyncContinue = true;
+ bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ bool Continue = DoOneBlock(ChunkIndexes);
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ });
+ return AsyncContinue.load();
+ }
+ else
+ {
+ return DoOneBlock(ChunkIndexes);
+ }
+ });
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ return AsyncContinue.load() && Continue;
}
void
@@ -387,7 +420,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
{
// ChunkLocation out of range of stored blocks
BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
- return;
+ return true;
}
IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
@@ -398,10 +431,11 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
if (RawHash == Hash)
{
// TODO: this should also hash the (decompressed) contents
- return;
+ return true;
}
}
BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
+ return true;
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
@@ -421,13 +455,16 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
if (RawHash == Hash)
{
// TODO: this should also hash the (decompressed) contents
- return;
+ return true;
}
}
BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
+ return true;
};
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr);
+ m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
+ return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
+ });
}
catch (const ScrubDeadlineExpiredException&)
{