diff options
| author | Dan Engelbrecht <[email protected]> | 2024-05-17 10:55:56 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-05-17 10:55:56 +0200 |
| commit | baf9891624b758b686e6b1c284013bea08794d69 (patch) | |
| tree | 732f00270fa5264565a8535cea911d034fc12ba9 /src/zenstore/compactcas.cpp | |
| parent | safer partial requests (#82) (diff) | |
| download | zen-baf9891624b758b686e6b1c284013bea08794d69.tar.xz zen-baf9891624b758b686e6b1c284013bea08794d69.zip | |
refactor BlockStore IterateChunks (#77)
Improvement: Refactored IterateChunks to allow reuse in diskcachelayer and hide public GetBlockFile() function in BlockStore
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 73 |
1 files changed, 55 insertions, 18 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index d6e5efdaa..dd92483b6 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -8,9 +8,11 @@ #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> #include <gsl/gsl-lite.hpp> @@ -315,20 +317,51 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment)); } } - bool Continue = true; - m_BlockStore.IterateChunks( - FoundChunkLocations, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - if (Data != nullptr) - { - Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); - } - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - }, - OptionalWorkerPool); - return Continue; + + auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock( + FoundChunkLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + if (Data == nullptr) + { + return true; + } + return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + }); + }; + + Latch WorkLatch(1); + std::atomic_bool AsyncContinue = true; + bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!AsyncContinue) + { + return; + } + bool Continue = DoOneBlock(ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(ChunkIndexes); + } + }); + WorkLatch.CountDown(); + WorkLatch.Wait(); + return AsyncContinue.load() && Continue; } void @@ -387,7 +420,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) { // ChunkLocation out of range of stored blocks BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); - return; + return true; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); @@ -398,10 +431,11 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents - return; + return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); + return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { @@ -421,13 +455,16 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents - return; + return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); + return true; }; - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr); + m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); + }); } catch (const ScrubDeadlineExpiredException&) { |