diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-27 16:05:56 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-27 16:05:56 +0100 |
| commit | 4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch) | |
| tree | c298828c6290a669500788f96f8ea25be41ff88a /src/zenstore/compactcas.cpp | |
| parent | remove bad assert (#670) (diff) | |
| download | zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip | |
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project)
- Improvement: Enabled more multi threading when running scrub operations
- Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 49 |
1 files changed, 26 insertions, 23 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index e1f17fbb9..a5de5c448 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -557,6 +557,10 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; + std::atomic<bool> Abort; + std::atomic<bool> Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); + try { RwLock::SharedLockScope _(m_LocationMapLock); @@ -589,58 +593,57 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + + if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash)) { - if (RawHash == Hash) - { - // TODO: this should also hash the (decompressed) contents - return true; - } + BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); } - BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - Ctx.ThrowIfDeadlineExpired(); - ChunkCount.fetch_add(1); ChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - IoHash RawHash; - uint64_t RawSize; - // TODO: Add API to verify compressed buffer without having to memory-map the whole file - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash)) { - if (RawHash == Hash) - { - // TODO: this should also hash the (decompressed) contents - return true; - } + BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); } - BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return true; }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); + Ctx.ThrowIfDeadlineExpired(); + Work.ScheduleWork( + Ctx.ThreadPool(), + [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); + } + }); + return !Abort; }); + Work.Wait(); } catch (const ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); if (!BadKeys.empty()) { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); + ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'", + BadKeys.size(), + ChunkCount.load(), + m_RootDirectory / m_ContainerBaseName); if (Ctx.RunRecovery()) { |