diff options
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 73 |
1 files changed, 55 insertions, 18 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index d6e5efdaa..dd92483b6 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -8,9 +8,11 @@ #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> #include <gsl/gsl-lite.hpp> @@ -315,20 +317,51 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment)); } } - bool Continue = true; - m_BlockStore.IterateChunks( - FoundChunkLocations, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - if (Data != nullptr) - { - Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); - } - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - }, - OptionalWorkerPool); - return Continue; + + auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock( + FoundChunkLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + if (Data == nullptr) + { + return true; + } + return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + }); + }; + + Latch WorkLatch(1); + std::atomic_bool AsyncContinue = true; + bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!AsyncContinue) + { + return; + } + bool Continue = DoOneBlock(ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(ChunkIndexes); + } + }); + WorkLatch.CountDown(); + WorkLatch.Wait(); + return AsyncContinue.load() && Continue; } void @@ -387,7 +420,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) { // ChunkLocation out of range of stored blocks BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); - return; + return true; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); @@ -398,10 +431,11 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents - return; + return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); + return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { @@ -421,13 +455,16 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents - return; + return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); + return true; }; - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr); + m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); + }); } catch (const ScrubDeadlineExpiredException&) { |